From 3c0abdcd7b612040e8f7f34a06e4a0d7ea19a448 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 7 Dec 2025 14:42:39 -0800 Subject: [PATCH 1/4] Document using originated time for TableDump v2 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39e48083..90cfae78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file. * Add bounds checks throughout parsers to avoid overread/advance/split_to panics * Handle invalid MRT BGP4MP_ET header length gracefully (reject ET records with on-wire length < 4) +* Use originated time instead of MRT header time for TableDump (v2) messages ([#252](https://github.com/bgpkit/bgpkit-parser/pull/252)) ### Tooling and benchmarking From 2d24fe5792fe5692cd8233620f118580af95da24 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 7 Dec 2025 15:12:50 -0800 Subject: [PATCH 2/4] Add UpdateIterator and FallibleUpdateIterator Add MrtUpdate enum plus Bgp4MpUpdate and TableDumpV2Entry structs and BgpkitParser::into_update_iter() and into_fallible_update_iter() to iterate BGP4MP UPDATEs and TableDumpV2 RIB entries. Include example, tests and a benchmark. --- CHANGELOG.md | 10 + README.md | 79 +++++-- benches/internals.rs | 26 +++ examples/README.md | 1 + examples/update_messages_iter.rs | 185 +++++++++++++++ src/lib.rs | 79 +++++-- src/parser/iters/mod.rs | 78 +++++++ src/parser/iters/update.rs | 383 +++++++++++++++++++++++++++++++ 8 files changed, 813 insertions(+), 28 deletions(-) create mode 100644 examples/update_messages_iter.rs create mode 100644 src/parser/iters/update.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 90cfae78..ac108746 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file. ## Unreleased +### New features + +* Add `UpdateIterator` and `FallibleUpdateIterator` for iterating over BGP announcements ([#250](https://github.com/bgpkit/bgpkit-parser/issues/250)) + - New `MrtUpdate` enum supporting both BGP4MP UPDATE messages and TableDumpV2 RIB entries + - `Bgp4MpUpdate` struct for BGP4MP UPDATE messages with metadata (timestamp, peer_ip, peer_asn) + - `TableDumpV2Entry` struct for RIB dump entries (prefix with multiple RIB entries per peer) + - `into_update_iter()` and `into_fallible_update_iter()` methods on `BgpkitParser` + - Middle ground between `MrtRecord` and `BgpElem` for more efficient processing +* Add `update_messages_iter` example demonstrating the new iterator + ### Testing and fuzzing * Add cargo-fuzz harness and initial fuzz targets (mrt_record, bgp_message, parser) diff --git a/README.md b/README.md index 3c902478..f2f30210 100644 --- a/README.md +++ b/README.md @@ -406,6 +406,7 @@ files.par_iter().for_each(|file| { #### Choose the right data structure - Use [MrtRecord] iteration for minimal memory overhead +- Use [MrtUpdate] for efficient batch processing without per-prefix attribute duplication - Use [BgpElem] for easier per-prefix analysis - See [Data Representation](#data-representation) for detailed comparison @@ -508,7 +509,7 @@ bgpkit-parser -o 13335 -m a -4 updates.bz2 ## Data Representation -BGPKIT Parser provides two ways to access parsed BGP data: [MrtRecord] and [BgpElem]. Choose based on your needs: +BGPKIT Parser provides three ways to access parsed BGP data: [MrtRecord], [MrtUpdate], and [BgpElem]. Choose based on your needs: ``` ┌──────────────────────────────────────────────┐ @@ -518,15 +519,15 @@ BGPKIT Parser provides two ways to access parsed BGP data: [MrtRecord] and [BgpE │ ├──> Parser │ - ┌─────────────┴────────────────┐ - │ │ - ▼ ▼ - [MrtRecord] [BgpElem] - (Low-level) (High-level) - │ │ - └──────────────┬───────────────┘ - │ - ▼ + ┌──────────────┼────────────────┐ + │ │ │ + ▼ ▼ ▼ + [MrtRecord] [MrtUpdate] [BgpElem] + (Low-level) (Intermediate) (High-level) + │ │ │ + └─────────────┴────────────────┘ + │ + ▼ Your Analysis Code ``` @@ -546,6 +547,48 @@ See the [MrtRecord] documentation for the complete structure definition. **Iteration**: Use [`BgpkitParser::into_record_iter()`] to iterate over [MrtRecord]s. +### [MrtUpdate]: Intermediate Message-Level Representation + +[MrtUpdate] provides access to BGP announcements without expanding them into individual per-prefix elements. This is a middle ground between [MrtRecord] and [BgpElem]. Use this when you need: +- **Efficient batch processing**: Avoid duplicating attributes across prefixes +- **Message-level analysis**: Work with UPDATE messages or RIB entries as units +- **Memory efficiency**: Shared attributes aren't cloned for each prefix + +**Supported message types** (via enum variants): +- `Bgp4MpUpdate`: BGP UPDATE messages from UPDATES files +- `TableDumpV2Entry`: RIB entries from TableDumpV2 RIB dumps +- `TableDumpMessage`: Legacy TableDump v1 messages + +**Example**: +```rust +use bgpkit_parser::{BgpkitParser, MrtUpdate}; + +let parser = BgpkitParser::new("updates.mrt.bz2").unwrap(); +for update in parser.into_update_iter() { + match update { + MrtUpdate::Bgp4MpUpdate(u) => { + // One UPDATE message may contain multiple prefixes sharing attributes + println!("Peer {} announced {} prefixes", + u.peer_ip, + u.message.announced_prefixes.len() + ); + } + MrtUpdate::TableDumpV2Entry(e) => { + // One prefix with multiple RIB entries (one per peer) + println!("Prefix {} seen by {} peers", + e.prefix, + e.rib_entries.len() + ); + } + MrtUpdate::TableDumpMessage(m) => { + println!("Legacy table dump for {}", m.prefix); + } + } +} +``` + +**Iteration**: Use [`BgpkitParser::into_update_iter()`] to iterate over [MrtUpdate]s. + ### [BgpElem]: High-level Per-Prefix Representation [BgpElem] provides a simplified, per-prefix view of BGP data. Each [BgpElem] represents a single prefix announcement or withdrawal. Use this when you want: @@ -589,10 +632,18 @@ See the [BgpElem] documentation for the complete structure definition. ### Which One Should I Use? -- **Use [BgpElem]** (default): For most BGP analysis tasks, prefix tracking, AS path analysis -- **Use [MrtRecord]**: When you need MRT format details, re-encoding, or minimal memory overhead - -**Memory trade-off**: [BgpElem] duplicates shared attributes (AS path, communities) for each prefix, consuming more memory but providing simpler analysis. +| Use Case | Recommended | Why | +|----------|-------------|-----| +| Simple prefix analysis | [BgpElem] | Easy per-prefix access, format-agnostic | +| High-performance processing | [MrtUpdate] | Avoids attribute duplication overhead | +| Counting prefixes per UPDATE | [MrtUpdate] | Direct access to message structure | +| Re-encoding MRT data | [MrtRecord] | Preserves complete MRT structure | +| MRT format-specific details | [MrtRecord] | Access to peer index tables, geo-location, etc. | + +**Memory trade-off**: +- [BgpElem] duplicates shared attributes (AS path, communities) for each prefix +- [MrtUpdate] keeps attributes shared within each message/entry +- [MrtRecord] has minimal overhead but requires more code to extract BGP data ## RFCs Support diff --git a/benches/internals.rs b/benches/internals.rs index b85ce5d6..8d27e47a 100644 --- a/benches/internals.rs +++ b/benches/internals.rs @@ -78,6 +78,19 @@ pub fn criterion_benchmark(c: &mut Criterion) { }) }); + c.bench_function("updates into_update_iter", |b| { + b.iter(|| { + let mut reader = black_box(&updates[..]); + + BgpkitParser::from_reader(&mut reader) + .into_update_iter() + .take(RECORD_LIMIT) + .for_each(|x| { + black_box(x); + }); + }) + }); + c.bench_function("rib into_record_iter", |b| { b.iter(|| { let mut reader = black_box(&rib_dump[..]); @@ -103,6 +116,19 @@ pub fn criterion_benchmark(c: &mut Criterion) { }); }) }); + + c.bench_function("rib into_update_iter", |b| { + b.iter(|| { + let mut reader = black_box(&rib_dump[..]); + + BgpkitParser::from_reader(&mut reader) + .into_update_iter() + .take(RECORD_LIMIT) + .for_each(|x| { + black_box(x); + }); + }) + }); } criterion_group! { diff --git a/examples/README.md b/examples/README.md index 2a468abf..7807f6f9 100644 --- a/examples/README.md +++ b/examples/README.md @@ -8,6 +8,7 @@ This directory contains runnable examples for bgpkit_parser. They demonstrate ba - [display_elems.rs](display_elems.rs) — Print selected fields from each BGP element in a compact, pipe_delimited format. - [count_elems.rs](count_elems.rs) — Count the total number of BGP elements in a given file. - [records_iter.rs](records_iter.rs) — Iterate over raw MRT records and inspect/update messages; includes an example of detecting the Only_To_Customer (OTC) attribute. +- [update_messages_iter.rs](update_messages_iter.rs) — Iterate over BGP announcements using the intermediate MrtUpdate representation; compares performance with BgpElem iteration and works with both UPDATES files and RIB dumps. - [scan_mrt.rs](scan_mrt.rs) — CLI_style scanner that quickly walks an MRT file, counting raw records, parsed records, or elements without processing them. ## Filtering and Policy Examples diff --git a/examples/update_messages_iter.rs b/examples/update_messages_iter.rs new file mode 100644 index 00000000..69426cf4 --- /dev/null +++ b/examples/update_messages_iter.rs @@ -0,0 +1,185 @@ +//! Example demonstrating the use of `into_update_iter()` for processing BGP announcements. +//! +//! The `UpdateIterator` provides a middle ground between `RecordIterator` and `ElemIterator`: +//! - More focused than `RecordIterator` as it only yields BGP announcements +//! - More efficient than `ElemIterator` as it avoids duplicating attributes for each prefix +//! +//! This iterator handles both: +//! - **BGP4MP UPDATE messages** from UPDATES files (real-time updates) +//! - **TableDumpV2 RIB entries** from RIB dump files (routing table snapshots) +//! +//! This example compares the performance of `UpdateIterator` vs `ElemIterator` when counting +//! announced and withdrawn prefixes, and verifies that both approaches yield the same results. +//! +//! Run with: cargo run --example update_messages_iter --release + +use bgpkit_parser::models::{AttributeValue, ElemType}; +use bgpkit_parser::{BgpkitParser, MrtUpdate}; +use std::time::Instant; + +fn main() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + // You can test with either an UPDATES file or a RIB dump file: + // UPDATES file (BGP4MP messages): + // let url = "https://archive.routeviews.org/bgpdata/2024.11/UPDATES/updates.20241101.0000.bz2"; + // RIB dump file (TableDumpV2 messages): + let url = "https://archive.routeviews.org/bgpdata/2024.11/RIBS/rib.20241101.0000.bz2"; + + log::info!("Parsing MRT file: {}", url); + log::info!(""); + + // ======================================== + // Method 1: Using UpdateIterator + // ======================================== + log::info!("=== Method 1: UpdateIterator ==="); + + let start = Instant::now(); + let parser = BgpkitParser::new(url).unwrap(); + + let mut bgp4mp_update_count = 0; + let mut rib_entry_count = 0; + let mut table_dump_v1_count = 0; + let mut update_iter_announced = 0; + let mut update_iter_withdrawn = 0; + + for update in parser.into_update_iter() { + match update { + MrtUpdate::Bgp4MpUpdate(update) => { + bgp4mp_update_count += 1; + + // Count announced prefixes (both from announced_prefixes and MP_REACH_NLRI) + let announced_count = update.message.announced_prefixes.len(); + let mp_reach_count: usize = update + .message + .attributes + .iter() + .filter_map(|attr| { + if let AttributeValue::MpReachNlri(nlri) = attr { + Some(nlri.prefixes.len()) + } else { + None + } + }) + .sum(); + update_iter_announced += announced_count + mp_reach_count; + + // Count withdrawn prefixes (both from withdrawn_prefixes and MP_UNREACH_NLRI) + let withdrawn_count = update.message.withdrawn_prefixes.len(); + let mp_unreach_count: usize = update + .message + .attributes + .iter() + .filter_map(|attr| { + if let AttributeValue::MpUnreachNlri(nlri) = attr { + Some(nlri.prefixes.len()) + } else { + None + } + }) + .sum(); + update_iter_withdrawn += withdrawn_count + mp_unreach_count; + } + MrtUpdate::TableDumpV2Entry(entry) => { + rib_entry_count += 1; + // In TableDumpV2, each entry represents ONE prefix with multiple RIB entries (one per peer) + // Each RIB entry is an announcement of that prefix + update_iter_announced += entry.rib_entries.len(); + } + MrtUpdate::TableDumpMessage(_msg) => { + table_dump_v1_count += 1; + // Legacy TableDump v1: one record = one prefix = one announcement + update_iter_announced += 1; + } + } + } + + let update_iter_duration = start.elapsed(); + + log::info!("Message counts:"); + log::info!(" - BGP4MP UPDATE messages: {}", bgp4mp_update_count); + log::info!(" - TableDumpV2 RIB entries: {}", rib_entry_count); + log::info!(" - TableDump v1 messages: {}", table_dump_v1_count); + log::info!("Total announced prefixes: {}", update_iter_announced); + log::info!("Total withdrawn prefixes: {}", update_iter_withdrawn); + log::info!("Time elapsed: {:?}", update_iter_duration); + log::info!(""); + + // ======================================== + // Method 2: Using ElemIterator + // ======================================== + log::info!("=== Method 2: ElemIterator ==="); + + let start = Instant::now(); + let parser = BgpkitParser::new(url).unwrap(); + + let mut elem_count = 0; + let mut elem_iter_announced = 0; + let mut elem_iter_withdrawn = 0; + + for elem in parser.into_elem_iter() { + elem_count += 1; + + match elem.elem_type { + ElemType::ANNOUNCE => elem_iter_announced += 1, + ElemType::WITHDRAW => elem_iter_withdrawn += 1, + } + } + + let elem_iter_duration = start.elapsed(); + + log::info!("Total BGP elements: {}", elem_count); + log::info!("Total announced prefixes: {}", elem_iter_announced); + log::info!("Total withdrawn prefixes: {}", elem_iter_withdrawn); + log::info!("Time elapsed: {:?}", elem_iter_duration); + log::info!(""); + + // ======================================== + // Comparison + // ======================================== + log::info!("=== Comparison ==="); + + let announced_match = update_iter_announced == elem_iter_announced; + let withdrawn_match = update_iter_withdrawn == elem_iter_withdrawn; + + log::info!( + "Announced prefixes match: {} (UpdateIter: {}, ElemIter: {})", + if announced_match { "✓" } else { "✗" }, + update_iter_announced, + elem_iter_announced + ); + log::info!( + "Withdrawn prefixes match: {} (UpdateIter: {}, ElemIter: {})", + if withdrawn_match { "✓" } else { "✗" }, + update_iter_withdrawn, + elem_iter_withdrawn + ); + + if update_iter_duration.as_nanos() > 0 { + let speedup = elem_iter_duration.as_secs_f64() / update_iter_duration.as_secs_f64(); + log::info!( + "Performance: UpdateIterator is {:.2}x {} than ElemIterator", + if speedup >= 1.0 { + speedup + } else { + 1.0 / speedup + }, + if speedup >= 1.0 { "faster" } else { "slower" } + ); + } + log::info!(" - UpdateIterator: {:?}", update_iter_duration); + log::info!(" - ElemIterator: {:?}", elem_iter_duration); + + // Assert counts match + assert_eq!( + update_iter_announced, elem_iter_announced, + "Announced prefix counts should match!" + ); + assert_eq!( + update_iter_withdrawn, elem_iter_withdrawn, + "Withdrawn prefix counts should match!" + ); + + log::info!(""); + log::info!("All counts verified successfully!"); +} diff --git a/src/lib.rs b/src/lib.rs index da387381..a09046b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -402,6 +402,7 @@ files.par_iter().for_each(|file| { ### Choose the right data structure - Use [MrtRecord] iteration for minimal memory overhead +- Use [MrtUpdate] for efficient batch processing without per-prefix attribute duplication - Use [BgpElem] for easier per-prefix analysis - See [Data Representation](#data-representation) for detailed comparison @@ -504,7 +505,7 @@ bgpkit-parser -o 13335 -m a -4 updates.bz2 # Data Representation -BGPKIT Parser provides two ways to access parsed BGP data: [MrtRecord] and [BgpElem]. Choose based on your needs: +BGPKIT Parser provides three ways to access parsed BGP data: [MrtRecord], [MrtUpdate], and [BgpElem]. Choose based on your needs: ```text ┌──────────────────────────────────────────────┐ @@ -514,15 +515,15 @@ BGPKIT Parser provides two ways to access parsed BGP data: [MrtRecord] and [BgpE │ ├──> Parser │ - ┌─────────────┴────────────────┐ - │ │ - ▼ ▼ - [MrtRecord] [BgpElem] - (Low-level) (High-level) - │ │ - └──────────────┬───────────────┘ - │ - ▼ + ┌──────────────┼────────────────┐ + │ │ │ + ▼ ▼ ▼ + [MrtRecord] [MrtUpdate] [BgpElem] + (Low-level) (Intermediate) (High-level) + │ │ │ + └─────────────┴────────────────┘ + │ + ▼ Your Analysis Code ``` @@ -542,6 +543,48 @@ See the [MrtRecord] documentation for the complete structure definition. **Iteration**: Use [`BgpkitParser::into_record_iter()`] to iterate over [MrtRecord]s. +## [MrtUpdate]: Intermediate Message-Level Representation + +[MrtUpdate] provides access to BGP announcements without expanding them into individual per-prefix elements. This is a middle ground between [MrtRecord] and [BgpElem]. Use this when you need: +- **Efficient batch processing**: Avoid duplicating attributes across prefixes +- **Message-level analysis**: Work with UPDATE messages or RIB entries as units +- **Memory efficiency**: Shared attributes aren't cloned for each prefix + +**Supported message types** (via enum variants): +- `Bgp4MpUpdate`: BGP UPDATE messages from UPDATES files +- `TableDumpV2Entry`: RIB entries from TableDumpV2 RIB dumps +- `TableDumpMessage`: Legacy TableDump v1 messages + +**Example**: +```no_run +use bgpkit_parser::{BgpkitParser, MrtUpdate}; + +let parser = BgpkitParser::new("updates.mrt.bz2").unwrap(); +for update in parser.into_update_iter() { + match update { + MrtUpdate::Bgp4MpUpdate(u) => { + // One UPDATE message may contain multiple prefixes sharing attributes + println!("Peer {} announced {} prefixes", + u.peer_ip, + u.message.announced_prefixes.len() + ); + } + MrtUpdate::TableDumpV2Entry(e) => { + // One prefix with multiple RIB entries (one per peer) + println!("Prefix {} seen by {} peers", + e.prefix, + e.rib_entries.len() + ); + } + MrtUpdate::TableDumpMessage(m) => { + println!("Legacy table dump for {}", m.prefix); + } + } +} +``` + +**Iteration**: Use [`BgpkitParser::into_update_iter()`] to iterate over [MrtUpdate]s. + ## [BgpElem]: High-level Per-Prefix Representation [BgpElem] provides a simplified, per-prefix view of BGP data. Each [BgpElem] represents a single prefix announcement or withdrawal. Use this when you want: @@ -585,10 +628,18 @@ See the [BgpElem] documentation for the complete structure definition. ## Which One Should I Use? -- **Use [BgpElem]** (default): For most BGP analysis tasks, prefix tracking, AS path analysis -- **Use [MrtRecord]**: When you need MRT format details, re-encoding, or minimal memory overhead - -**Memory trade-off**: [BgpElem] duplicates shared attributes (AS path, communities) for each prefix, consuming more memory but providing simpler analysis. +| Use Case | Recommended | Why | +|----------|-------------|-----| +| Simple prefix analysis | [BgpElem] | Easy per-prefix access, format-agnostic | +| High-performance processing | [MrtUpdate] | Avoids attribute duplication overhead | +| Counting prefixes per UPDATE | [MrtUpdate] | Direct access to message structure | +| Re-encoding MRT data | [MrtRecord] | Preserves complete MRT structure | +| MRT format-specific details | [MrtRecord] | Access to peer index tables, geo-location, etc. | + +**Memory trade-off**: +- [BgpElem] duplicates shared attributes (AS path, communities) for each prefix +- [MrtUpdate] keeps attributes shared within each message/entry +- [MrtRecord] has minimal overhead but requires more code to extract BGP data # RFCs Support diff --git a/src/parser/iters/mod.rs b/src/parser/iters/mod.rs index 5f2154af..df94c0fe 100644 --- a/src/parser/iters/mod.rs +++ b/src/parser/iters/mod.rs @@ -4,6 +4,7 @@ Iterator implementations for bgpkit-parser. This module contains different iterator implementations for parsing BGP data: - `default`: Standard iterators that skip errors (RecordIterator, ElemIterator) - `fallible`: Fallible iterators that return Results (FallibleRecordIterator, FallibleElemIterator) +- `update`: Iterators for BGP UPDATE messages (UpdateIterator, FallibleUpdateIterator) It also contains the trait implementations that enable BgpkitParser to be used with Rust's iterator syntax. @@ -12,11 +13,15 @@ Rust's iterator syntax. pub mod default; pub mod fallible; mod raw; +mod update; // Re-export all iterator types for convenience pub use default::{ElemIterator, RecordIterator}; pub use fallible::{FallibleElemIterator, FallibleRecordIterator}; pub use raw::RawRecordIterator; +pub use update::{ + Bgp4MpUpdate, FallibleUpdateIterator, MrtUpdate, TableDumpV2Entry, UpdateIterator, +}; use crate::models::BgpElem; use crate::parser::BgpkitParser; @@ -45,6 +50,49 @@ impl BgpkitParser { RawRecordIterator::new(self) } + /// Creates an iterator over BGP announcements from MRT data. + /// + /// This iterator yields `MrtUpdate` items from both UPDATES files (BGP4MP messages) + /// and RIB dump files (TableDump/TableDumpV2 messages). It's a middle ground + /// between `into_record_iter()` and `into_elem_iter()`: + /// + /// - More focused than `into_record_iter()` as it only returns BGP announcements + /// - More efficient than `into_elem_iter()` as it doesn't duplicate attributes per prefix + /// + /// The iterator returns an `MrtUpdate` enum with variants: + /// - `Bgp4MpUpdate`: BGP UPDATE messages from UPDATES files + /// - `TableDumpV2Entry`: RIB entries from TableDumpV2 RIB dumps + /// - `TableDumpMessage`: Legacy TableDump v1 messages + /// + /// # Example + /// ```no_run + /// use bgpkit_parser::{BgpkitParser, MrtUpdate}; + /// + /// let parser = BgpkitParser::new("updates.mrt").unwrap(); + /// for update in parser.into_update_iter() { + /// match update { + /// MrtUpdate::Bgp4MpUpdate(u) => { + /// println!("Peer {} announced {} prefixes", + /// u.peer_ip, + /// u.message.announced_prefixes.len() + /// ); + /// } + /// MrtUpdate::TableDumpV2Entry(e) => { + /// println!("RIB entry for {} with {} peers", + /// e.prefix, + /// e.rib_entries.len() + /// ); + /// } + /// MrtUpdate::TableDumpMessage(m) => { + /// println!("Legacy table dump for {}", m.prefix); + /// } + /// } + /// } + /// ``` + pub fn into_update_iter(self) -> UpdateIterator { + UpdateIterator::new(self) + } + /// Creates a fallible iterator over MRT records that returns parsing errors. /// /// # Example @@ -90,4 +138,34 @@ impl BgpkitParser { pub fn into_fallible_elem_iter(self) -> FallibleElemIterator { FallibleElemIterator::new(self) } + + /// Creates a fallible iterator over BGP announcements that returns parsing errors. + /// + /// Unlike the default `into_update_iter()`, this iterator returns + /// `Result` allowing users to handle parsing + /// errors explicitly instead of having them logged and skipped. + /// + /// # Example + /// ```no_run + /// use bgpkit_parser::{BgpkitParser, MrtUpdate}; + /// + /// let parser = BgpkitParser::new("updates.mrt").unwrap(); + /// for result in parser.into_fallible_update_iter() { + /// match result { + /// Ok(MrtUpdate::Bgp4MpUpdate(update)) => { + /// println!("Peer {} announced {} prefixes", + /// update.peer_ip, + /// update.message.announced_prefixes.len() + /// ); + /// } + /// Ok(_) => { /* handle other variants */ } + /// Err(e) => { + /// eprintln!("Error parsing: {}", e); + /// } + /// } + /// } + /// ``` + pub fn into_fallible_update_iter(self) -> FallibleUpdateIterator { + FallibleUpdateIterator::new(self) + } } diff --git a/src/parser/iters/update.rs b/src/parser/iters/update.rs new file mode 100644 index 00000000..95f5c2b1 --- /dev/null +++ b/src/parser/iters/update.rs @@ -0,0 +1,383 @@ +/*! +Update message iterator implementation. + +This module provides iterators that yield BGP announcement data from MRT files, +supporting both BGP4MP UPDATE messages and RIB dump entries. + +## Overview + +The iterators in this module provide a middle ground between `MrtRecord` and `BgpElem`: +- More focused than `MrtRecord` as they only yield BGP announcements +- More efficient than `BgpElem` as they avoid duplicating attributes for each prefix + +## Message Types + +### BGP4MP Updates (from UPDATES files) +- One message contains multiple prefixes sharing the SAME attributes +- Efficient when you need to process updates without per-prefix attribute cloning + +### RIB Entries (from RIB dump files) +- One record contains ONE prefix with multiple RIB entries (one per peer) +- Each peer has its own attributes for the same prefix + +## Usage + +```no_run +use bgpkit_parser::BgpkitParser; + +let parser = BgpkitParser::new("updates.mrt").unwrap(); +for announcement in parser.into_update_iter() { + match announcement { + bgpkit_parser::MrtUpdate::Bgp4MpUpdate(update) => { + println!("BGP UPDATE from peer {}", update.peer_ip); + } + bgpkit_parser::MrtUpdate::TableDumpV2Entry(entry) => { + println!("RIB entry for prefix {}", entry.prefix); + } + bgpkit_parser::MrtUpdate::TableDumpMessage(msg) => { + println!("Legacy table dump for prefix {}", msg.prefix); + } + } +} +``` +*/ +use crate::error::ParserError; +use crate::models::*; +use crate::parser::BgpkitParser; +use crate::Elementor; +use log::{error, warn}; +use std::io::Read; +use std::net::IpAddr; + +/// A BGP4MP UPDATE message with associated metadata. +/// +/// This struct wraps a `BgpUpdateMessage` with the peer information and timestamp +/// from the MRT record. It's more efficient than `BgpElem` when a single UPDATE +/// contains multiple prefixes, as the attributes are not duplicated. +#[derive(Debug, Clone, PartialEq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Bgp4MpUpdate { + /// The timestamp of the MRT record in floating-point format (seconds since epoch). + pub timestamp: f64, + /// The IP address of the BGP peer that sent this update. + pub peer_ip: IpAddr, + /// The ASN of the BGP peer that sent this update. + pub peer_asn: Asn, + /// The BGP UPDATE message containing announcements, withdrawals, and attributes. + pub message: BgpUpdateMessage, +} + +/// A TableDumpV2 RIB entry with associated metadata. +/// +/// This struct represents a single prefix with all its RIB entries from different peers. +/// Each RIB entry contains the peer information and attributes for that prefix. +#[derive(Debug, Clone, PartialEq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct TableDumpV2Entry { + /// The timestamp from the MRT header. + pub timestamp: f64, + /// The RIB subtype (IPv4 Unicast, IPv6 Unicast, etc.) + pub rib_type: TableDumpV2Type, + /// The sequence number of this RIB entry. + pub sequence_number: u32, + /// The network prefix for this RIB entry. + pub prefix: NetworkPrefix, + /// The RIB entries for this prefix, one per peer. + /// Each entry contains peer_index, originated_time, and attributes. + pub rib_entries: Vec, +} + +/// Unified enum representing BGP announcements from different MRT message types. +/// +/// This enum provides a common interface for processing BGP data from: +/// - BGP4MP UPDATE messages (real-time updates) +/// - TableDumpV2 RIB entries (routing table snapshots) +/// - Legacy TableDump messages +#[derive(Debug, Clone, PartialEq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum MrtUpdate { + /// A BGP4MP UPDATE message from an UPDATES file. + Bgp4MpUpdate(Bgp4MpUpdate), + /// A TableDumpV2 RIB entry from a RIB dump file. + TableDumpV2Entry(TableDumpV2Entry), + /// A legacy TableDump (v1) message. + TableDumpMessage(TableDumpMessage), +} + +impl MrtUpdate { + /// Returns the timestamp of this update/entry. + pub fn timestamp(&self) -> f64 { + match self { + MrtUpdate::Bgp4MpUpdate(u) => u.timestamp, + MrtUpdate::TableDumpV2Entry(e) => e.timestamp, + MrtUpdate::TableDumpMessage(m) => m.originated_time as f64, + } + } +} + +/// Iterator over BGP announcements from MRT data. +/// +/// This iterator yields `MrtUpdate` items from both UPDATES files (BGP4MP messages) +/// and RIB dump files (TableDump/TableDumpV2 messages). +/// +/// Unlike `ElemIterator`, this iterator does not expand messages into individual +/// `BgpElem`s, making it more efficient for use cases that need to process +/// the raw message structures. +pub struct UpdateIterator { + parser: BgpkitParser, + elementor: Elementor, +} + +impl UpdateIterator { + pub(crate) fn new(parser: BgpkitParser) -> Self { + UpdateIterator { + parser, + elementor: Elementor::new(), + } + } +} + +impl Iterator for UpdateIterator { + type Item = MrtUpdate; + + fn next(&mut self) -> Option { + loop { + let record = match self.parser.next_record() { + Ok(record) => record, + Err(e) => match e.error { + ParserError::TruncatedMsg(err_str) | ParserError::Unsupported(err_str) => { + if self.parser.options.show_warnings { + warn!("parser warn: {}", err_str); + } + continue; + } + ParserError::ParseError(err_str) => { + error!("parser error: {}", err_str); + if self.parser.core_dump { + if let Some(bytes) = e.bytes { + std::fs::write("mrt_core_dump", bytes) + .expect("Unable to write to mrt_core_dump"); + } + return None; + } + continue; + } + ParserError::EofExpected => return None, + ParserError::IoError(err) | ParserError::EofError(err) => { + error!("{:?}", err); + if self.parser.core_dump { + if let Some(bytes) = e.bytes { + std::fs::write("mrt_core_dump", bytes) + .expect("Unable to write to mrt_core_dump"); + } + } + return None; + } + #[cfg(feature = "oneio")] + ParserError::OneIoError(_) => return None, + ParserError::FilterError(_) => return None, + }, + }; + + let t = record.common_header.timestamp; + let timestamp: f64 = if let Some(micro) = &record.common_header.microsecond_timestamp { + let m = (*micro as f64) / 1_000_000.0; + t as f64 + m + } else { + f64::from(t) + }; + + match record.message { + MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(msg)) => { + if let BgpMessage::Update(update) = msg.bgp_message { + return Some(MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate { + timestamp, + peer_ip: msg.peer_ip, + peer_asn: msg.peer_asn, + message: update, + })); + } + // Not an UPDATE message (OPEN, NOTIFICATION, KEEPALIVE), continue + continue; + } + MrtMessage::Bgp4Mp(Bgp4MpEnum::StateChange(_)) => { + // State change messages don't contain announcement data + continue; + } + MrtMessage::TableDumpV2Message(msg) => { + match msg { + TableDumpV2Message::PeerIndexTable(p) => { + // Store peer table for later use and continue + self.elementor.peer_table = Some(p); + continue; + } + TableDumpV2Message::RibAfi(entries) => { + return Some(MrtUpdate::TableDumpV2Entry(TableDumpV2Entry { + timestamp, + rib_type: entries.rib_type, + sequence_number: entries.sequence_number, + prefix: entries.prefix, + rib_entries: entries.rib_entries, + })); + } + TableDumpV2Message::RibGeneric(_) => { + // RibGeneric is not commonly used, skip for now + continue; + } + TableDumpV2Message::GeoPeerTable(_) => { + // GeoPeerTable doesn't contain route data + continue; + } + } + } + MrtMessage::TableDumpMessage(msg) => { + return Some(MrtUpdate::TableDumpMessage(msg)); + } + } + } + } +} + +/// Fallible iterator over BGP announcements that returns parsing errors. +/// +/// Unlike the default `UpdateIterator`, this iterator returns `Result` +/// allowing users to handle parsing errors explicitly instead of having them logged and skipped. +pub struct FallibleUpdateIterator { + parser: BgpkitParser, + elementor: Elementor, +} + +impl FallibleUpdateIterator { + pub(crate) fn new(parser: BgpkitParser) -> Self { + FallibleUpdateIterator { + parser, + elementor: Elementor::new(), + } + } +} + +impl Iterator for FallibleUpdateIterator { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + match self.parser.next_record() { + Ok(record) => { + let t = record.common_header.timestamp; + let timestamp: f64 = + if let Some(micro) = &record.common_header.microsecond_timestamp { + let m = (*micro as f64) / 1_000_000.0; + t as f64 + m + } else { + f64::from(t) + }; + + match record.message { + MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(msg)) => { + if let BgpMessage::Update(update) = msg.bgp_message { + return Some(Ok(MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate { + timestamp, + peer_ip: msg.peer_ip, + peer_asn: msg.peer_asn, + message: update, + }))); + } + continue; + } + MrtMessage::Bgp4Mp(Bgp4MpEnum::StateChange(_)) => { + continue; + } + MrtMessage::TableDumpV2Message(msg) => match msg { + TableDumpV2Message::PeerIndexTable(p) => { + self.elementor.peer_table = Some(p); + continue; + } + TableDumpV2Message::RibAfi(entries) => { + return Some(Ok(MrtUpdate::TableDumpV2Entry(TableDumpV2Entry { + timestamp, + rib_type: entries.rib_type, + sequence_number: entries.sequence_number, + prefix: entries.prefix, + rib_entries: entries.rib_entries, + }))); + } + TableDumpV2Message::RibGeneric(_) => { + continue; + } + TableDumpV2Message::GeoPeerTable(_) => { + continue; + } + }, + MrtMessage::TableDumpMessage(msg) => { + return Some(Ok(MrtUpdate::TableDumpMessage(msg))); + } + } + } + Err(e) if matches!(e.error, ParserError::EofExpected) => { + return None; + } + Err(e) => { + return Some(Err(e)); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + #[test] + fn test_bgp4mp_update_struct() { + let update = Bgp4MpUpdate { + timestamp: 1234567890.123456, + peer_ip: "192.0.2.1".parse().unwrap(), + peer_asn: Asn::new_32bit(65000), + message: BgpUpdateMessage::default(), + }; + + assert_eq!(update.timestamp, 1234567890.123456); + assert_eq!(update.peer_ip.to_string(), "192.0.2.1"); + assert_eq!(update.peer_asn, Asn::new_32bit(65000)); + } + + #[test] + fn test_mrt_update_timestamp() { + let bgp4mp = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate { + timestamp: 1234567890.5, + peer_ip: "192.0.2.1".parse().unwrap(), + peer_asn: Asn::new_32bit(65000), + message: BgpUpdateMessage::default(), + }); + assert_eq!(bgp4mp.timestamp(), 1234567890.5); + + let table_dump_v2 = MrtUpdate::TableDumpV2Entry(TableDumpV2Entry { + timestamp: 1234567891.5, + rib_type: TableDumpV2Type::RibIpv4Unicast, + sequence_number: 1, + prefix: "10.0.0.0/8".parse().unwrap(), + rib_entries: vec![], + }); + assert_eq!(table_dump_v2.timestamp(), 1234567891.5); + } + + #[test] + fn test_update_iterator_empty() { + let cursor = Cursor::new(vec![]); + let parser = BgpkitParser::from_reader(cursor); + let mut iter = UpdateIterator::new(parser); + + assert!(iter.next().is_none()); + } + + #[test] + fn test_fallible_update_iterator_empty() { + let cursor = Cursor::new(vec![]); + let parser = BgpkitParser::from_reader(cursor); + let mut iter = FallibleUpdateIterator::new(parser); + + assert!(iter.next().is_none()); + } +} From 3c586459f21ad4e2b756f01c9db3407fc2062fce Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 7 Dec 2025 15:26:12 -0800 Subject: [PATCH 3/4] Add tests for update iterator and MRT enums --- src/parser/iters/update.rs | 352 +++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) diff --git a/src/parser/iters/update.rs b/src/parser/iters/update.rs index 95f5c2b1..ee20426f 100644 --- a/src/parser/iters/update.rs +++ b/src/parser/iters/update.rs @@ -343,8 +343,26 @@ mod tests { assert_eq!(update.peer_asn, Asn::new_32bit(65000)); } + #[test] + fn test_table_dump_v2_entry_struct() { + let entry = TableDumpV2Entry { + timestamp: 1234567890.0, + rib_type: TableDumpV2Type::RibIpv4Unicast, + sequence_number: 42, + prefix: "10.0.0.0/8".parse().unwrap(), + rib_entries: vec![], + }; + + assert_eq!(entry.timestamp, 1234567890.0); + assert_eq!(entry.rib_type, TableDumpV2Type::RibIpv4Unicast); + assert_eq!(entry.sequence_number, 42); + assert_eq!(entry.prefix.to_string(), "10.0.0.0/8"); + assert!(entry.rib_entries.is_empty()); + } + #[test] fn test_mrt_update_timestamp() { + // Test Bgp4MpUpdate variant let bgp4mp = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate { timestamp: 1234567890.5, peer_ip: "192.0.2.1".parse().unwrap(), @@ -353,6 +371,7 @@ mod tests { }); assert_eq!(bgp4mp.timestamp(), 1234567890.5); + // Test TableDumpV2Entry variant let table_dump_v2 = MrtUpdate::TableDumpV2Entry(TableDumpV2Entry { timestamp: 1234567891.5, rib_type: TableDumpV2Type::RibIpv4Unicast, @@ -361,6 +380,19 @@ mod tests { rib_entries: vec![], }); assert_eq!(table_dump_v2.timestamp(), 1234567891.5); + + // Test TableDumpMessage variant + let table_dump_v1 = MrtUpdate::TableDumpMessage(TableDumpMessage { + view_number: 0, + sequence_number: 1, + prefix: "192.168.0.0/16".parse().unwrap(), + status: 1, + originated_time: 1234567892, + peer_ip: "10.0.0.1".parse().unwrap(), + peer_asn: Asn::new_32bit(65001), + attributes: Attributes::default(), + }); + assert_eq!(table_dump_v1.timestamp(), 1234567892.0); } #[test] @@ -380,4 +412,324 @@ mod tests { assert!(iter.next().is_none()); } + + #[test] + fn test_bgp4mp_update_clone_and_debug() { + let update = Bgp4MpUpdate { + timestamp: 1234567890.123456, + peer_ip: "192.0.2.1".parse().unwrap(), + peer_asn: Asn::new_32bit(65000), + message: BgpUpdateMessage::default(), + }; + + // Test Clone + let cloned = update.clone(); + assert_eq!(update, cloned); + + // Test Debug + let debug_str = format!("{:?}", update); + assert!(debug_str.contains("Bgp4MpUpdate")); + assert!(debug_str.contains("192.0.2.1")); + } + + #[test] + fn test_table_dump_v2_entry_clone_and_debug() { + let entry = TableDumpV2Entry { + timestamp: 1234567890.0, + rib_type: TableDumpV2Type::RibIpv4Unicast, + sequence_number: 42, + prefix: "10.0.0.0/8".parse().unwrap(), + rib_entries: vec![], + }; + + // Test Clone + let cloned = entry.clone(); + assert_eq!(entry, cloned); + + // Test Debug + let debug_str = format!("{:?}", entry); + assert!(debug_str.contains("TableDumpV2Entry")); + assert!(debug_str.contains("10.0.0.0/8")); + } + + #[test] + fn test_mrt_update_clone_and_debug() { + let update = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate { + timestamp: 1234567890.5, + peer_ip: "192.0.2.1".parse().unwrap(), + peer_asn: Asn::new_32bit(65000), + message: BgpUpdateMessage::default(), + }); + + // Test Clone + let cloned = update.clone(); + assert_eq!(update, cloned); + + // Test Debug + let debug_str = format!("{:?}", update); + assert!(debug_str.contains("Bgp4MpUpdate")); + } + + #[test] + fn test_fallible_update_iterator_with_invalid_data() { + // Create invalid MRT data that will trigger a parsing error + let invalid_data = vec![ + 0x00, 0x00, 0x00, 0x00, // timestamp + 0xFF, 0xFF, // invalid type + 0x00, 0x00, // subtype + 0x00, 0x00, 0x00, 0x04, // length + 0x00, 0x00, 0x00, 0x00, // dummy data + ]; + + let cursor = Cursor::new(invalid_data); + let parser = BgpkitParser::from_reader(cursor); + let mut iter = FallibleUpdateIterator::new(parser); + + // First item should be an error + let result = iter.next(); + assert!(result.is_some()); + assert!(result.unwrap().is_err()); + } + + #[test] + fn test_mrt_update_enum_variants() { + // Test that all enum variants can be constructed and matched + let updates: Vec = vec![ + MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate { + timestamp: 1.0, + peer_ip: "192.0.2.1".parse().unwrap(), + peer_asn: Asn::new_32bit(65000), + message: BgpUpdateMessage::default(), + }), + MrtUpdate::TableDumpV2Entry(TableDumpV2Entry { + timestamp: 2.0, + rib_type: TableDumpV2Type::RibIpv6Unicast, + sequence_number: 1, + prefix: "2001:db8::/32".parse().unwrap(), + rib_entries: vec![], + }), + MrtUpdate::TableDumpMessage(TableDumpMessage { + view_number: 0, + sequence_number: 1, + prefix: "10.0.0.0/8".parse().unwrap(), + status: 1, + originated_time: 3, + peer_ip: "10.0.0.1".parse().unwrap(), + peer_asn: Asn::new_32bit(65001), + attributes: Attributes::default(), + }), + ]; + + for (i, update) in updates.iter().enumerate() { + match update { + MrtUpdate::Bgp4MpUpdate(_) => assert_eq!(i, 0), + MrtUpdate::TableDumpV2Entry(_) => assert_eq!(i, 1), + MrtUpdate::TableDumpMessage(_) => assert_eq!(i, 2), + } + } + } + + #[test] + #[cfg(feature = "serde")] + fn test_bgp4mp_update_serde() { + let update = Bgp4MpUpdate { + timestamp: 1234567890.123456, + peer_ip: "192.0.2.1".parse().unwrap(), + peer_asn: Asn::new_32bit(65000), + message: BgpUpdateMessage::default(), + }; + + let serialized = serde_json::to_string(&update).unwrap(); + let deserialized: Bgp4MpUpdate = serde_json::from_str(&serialized).unwrap(); + assert_eq!(update, deserialized); + } + + #[test] + #[cfg(feature = "serde")] + fn test_table_dump_v2_entry_serde() { + let entry = TableDumpV2Entry { + timestamp: 1234567890.0, + rib_type: TableDumpV2Type::RibIpv4Unicast, + sequence_number: 42, + prefix: "10.0.0.0/8".parse().unwrap(), + rib_entries: vec![], + }; + + let serialized = serde_json::to_string(&entry).unwrap(); + let deserialized: TableDumpV2Entry = serde_json::from_str(&serialized).unwrap(); + assert_eq!(entry, deserialized); + } + + #[test] + #[cfg(feature = "serde")] + fn test_mrt_update_serde() { + let update = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate { + timestamp: 1234567890.5, + peer_ip: "192.0.2.1".parse().unwrap(), + peer_asn: Asn::new_32bit(65000), + message: BgpUpdateMessage::default(), + }); + + let serialized = serde_json::to_string(&update).unwrap(); + let deserialized: MrtUpdate = serde_json::from_str(&serialized).unwrap(); + assert_eq!(update, deserialized); + } + + /// Test parsing real UPDATES file data + #[test] + fn test_update_iterator_with_updates_file() { + let url = "https://spaces.bgpkit.org/parser/update-example"; + let parser = BgpkitParser::new(url).unwrap(); + + let mut bgp4mp_count = 0; + let mut total_announced = 0; + let mut total_withdrawn = 0; + + for update in parser.into_update_iter() { + match update { + MrtUpdate::Bgp4MpUpdate(u) => { + bgp4mp_count += 1; + total_announced += u.message.announced_prefixes.len(); + total_withdrawn += u.message.withdrawn_prefixes.len(); + // Also count MP_REACH/MP_UNREACH prefixes + for attr in &u.message.attributes { + match attr { + AttributeValue::MpReachNlri(nlri) => { + total_announced += nlri.prefixes.len(); + } + AttributeValue::MpUnreachNlri(nlri) => { + total_withdrawn += nlri.prefixes.len(); + } + _ => {} + } + } + } + MrtUpdate::TableDumpV2Entry(_) => { + panic!("Should not see TableDumpV2Entry in UPDATES file"); + } + MrtUpdate::TableDumpMessage(_) => { + panic!("Should not see TableDumpMessage in UPDATES file"); + } + } + } + + // Verify we got some data + assert!(bgp4mp_count > 0, "Should have parsed some BGP4MP updates"); + assert!( + total_announced + total_withdrawn > 0, + "Should have some prefixes" + ); + } + + /// Test parsing real RIB dump file data + #[test] + fn test_update_iterator_with_rib_file() { + let url = "https://spaces.bgpkit.org/parser/rib-example-small.bz2"; + let parser = BgpkitParser::new(url).unwrap(); + + let mut rib_entry_count = 0; + let mut total_rib_entries = 0; + + for update in parser.into_update_iter().take(100) { + match update { + MrtUpdate::Bgp4MpUpdate(_) => { + panic!("Should not see Bgp4MpUpdate in RIB file"); + } + MrtUpdate::TableDumpV2Entry(e) => { + rib_entry_count += 1; + total_rib_entries += e.rib_entries.len(); + // Verify the entry has valid data + assert!(e.sequence_number > 0 || rib_entry_count == 1); + } + MrtUpdate::TableDumpMessage(_) => { + // Legacy format is also acceptable in RIB files + } + } + } + + // Verify we got some data + assert!(rib_entry_count > 0, "Should have parsed some RIB entries"); + assert!( + total_rib_entries > 0, + "Should have some RIB entries per prefix" + ); + } + + /// Test fallible iterator with real data + #[test] + fn test_fallible_update_iterator_with_updates_file() { + let url = "https://spaces.bgpkit.org/parser/update-example"; + let parser = BgpkitParser::new(url).unwrap(); + + let mut success_count = 0; + let mut error_count = 0; + + for result in parser.into_fallible_update_iter() { + match result { + Ok(_) => success_count += 1, + Err(_) => error_count += 1, + } + } + + assert!( + success_count > 0, + "Should have parsed some updates successfully" + ); + // The test file should be valid, so we expect no errors + assert_eq!( + error_count, 0, + "Should have no parsing errors in valid file" + ); + } + + /// Test that UpdateIterator and ElemIterator yield consistent prefix counts + #[test] + fn test_update_iter_vs_elem_iter_consistency() { + let url = "https://spaces.bgpkit.org/parser/update-example"; + + // Count prefixes using UpdateIterator + let parser1 = BgpkitParser::new(url).unwrap(); + let mut update_iter_announced = 0; + let mut update_iter_withdrawn = 0; + + for update in parser1.into_update_iter() { + if let MrtUpdate::Bgp4MpUpdate(u) = update { + update_iter_announced += u.message.announced_prefixes.len(); + update_iter_withdrawn += u.message.withdrawn_prefixes.len(); + for attr in &u.message.attributes { + match attr { + AttributeValue::MpReachNlri(nlri) => { + update_iter_announced += nlri.prefixes.len(); + } + AttributeValue::MpUnreachNlri(nlri) => { + update_iter_withdrawn += nlri.prefixes.len(); + } + _ => {} + } + } + } + } + + // Count prefixes using ElemIterator + let parser2 = BgpkitParser::new(url).unwrap(); + let mut elem_iter_announced = 0; + let mut elem_iter_withdrawn = 0; + + for elem in parser2.into_elem_iter() { + match elem.elem_type { + ElemType::ANNOUNCE => elem_iter_announced += 1, + ElemType::WITHDRAW => elem_iter_withdrawn += 1, + } + } + + // Counts should match + assert_eq!( + update_iter_announced, elem_iter_announced, + "Announced prefix counts should match" + ); + assert_eq!( + update_iter_withdrawn, elem_iter_withdrawn, + "Withdrawn prefix counts should match" + ); + } } From e504b5436ff3f9a054ae78514c04dd3c0c9467b8 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 7 Dec 2025 15:28:59 -0800 Subject: [PATCH 4/4] Write MRT core dump on parse errors and fix docs When core_dump is enabled, write available record bytes to mrt_core_dump on parse errors. Also fix minor typos in CHANGELOG and examples README --- CHANGELOG.md | 2 +- examples/README.md | 4 ++-- src/parser/iters/update.rs | 6 ++++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac108746..c02f10ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ All notable changes to this project will be documented in this file. * Add bounds checks throughout parsers to avoid overread/advance/split_to panics * Handle invalid MRT BGP4MP_ET header length gracefully (reject ET records with on-wire length < 4) -* Use originated time instead of MRT header time for TableDump (v2) messages ([#252](https://github.com/bgpkit/bgpkit-parser/pull/252)) +* Use originated time instead of MRT header time for TableDumpV2 messages ([#252](https://github.com/bgpkit/bgpkit-parser/pull/252)) ### Tooling and benchmarking diff --git a/examples/README.md b/examples/README.md index 7807f6f9..9d404130 100644 --- a/examples/README.md +++ b/examples/README.md @@ -5,11 +5,11 @@ This directory contains runnable examples for bgpkit_parser. They demonstrate ba ## Quickstart and Iteration - [parse_single_file.rs](parse_single_file.rs) — Download and iterate over a single RouteViews updates file, logging each BGP element (BgpElem). - [parse_single_file_parallel.rs](parse_single_file_parallel.rs) — Parse a single compressed RIB in parallel using a raw iterator + worker pool. Downloads to current directory if remote, counts elems, and compares timing with a sequential run. Tunables via env vars: BATCH_SIZE, WORKERS, CHAN_CAP, ELEM_IN_WORKERS, QUIET_ERRORS. -- [display_elems.rs](display_elems.rs) — Print selected fields from each BGP element in a compact, pipe_delimited format. +- [display_elems.rs](display_elems.rs) — Print selected fields from each BGP element in a compact, pipe-delimited format. - [count_elems.rs](count_elems.rs) — Count the total number of BGP elements in a given file. - [records_iter.rs](records_iter.rs) — Iterate over raw MRT records and inspect/update messages; includes an example of detecting the Only_To_Customer (OTC) attribute. - [update_messages_iter.rs](update_messages_iter.rs) — Iterate over BGP announcements using the intermediate MrtUpdate representation; compares performance with BgpElem iteration and works with both UPDATES files and RIB dumps. -- [scan_mrt.rs](scan_mrt.rs) — CLI_style scanner that quickly walks an MRT file, counting raw records, parsed records, or elements without processing them. +- [scan_mrt.rs](scan_mrt.rs) — CLI-style scanner that quickly walks an MRT file, counting raw records, parsed records, or elements without processing them. ## Filtering and Policy Examples - [filters.rs](filters.rs) — Parse an MRT file and filter by a specific prefix (e.g., 211.98.251.0/24), logging matching announcements. diff --git a/src/parser/iters/update.rs b/src/parser/iters/update.rs index ee20426f..a6c7d7e0 100644 --- a/src/parser/iters/update.rs +++ b/src/parser/iters/update.rs @@ -149,6 +149,12 @@ impl Iterator for UpdateIterator { if self.parser.options.show_warnings { warn!("parser warn: {}", err_str); } + if self.parser.core_dump { + if let Some(bytes) = e.bytes { + std::fs::write("mrt_core_dump", bytes) + .expect("Unable to write to mrt_core_dump"); + } + } continue; } ParserError::ParseError(err_str) => {