Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file.

## Unreleased

### New features

* Add `UpdateIterator` and `FallibleUpdateIterator` for iterating over BGP announcements ([#250](https://github.com/bgpkit/bgpkit-parser/issues/250))
- New `MrtUpdate` enum supporting both BGP4MP UPDATE messages and TableDumpV2 RIB entries
- `Bgp4MpUpdate` struct for BGP4MP UPDATE messages with metadata (timestamp, peer_ip, peer_asn)
- `TableDumpV2Entry` struct for RIB dump entries (prefix with multiple RIB entries per peer)
- `into_update_iter()` and `into_fallible_update_iter()` methods on `BgpkitParser`
- Middle ground between `MrtRecord` and `BgpElem` for more efficient processing
* Add `update_messages_iter` example demonstrating the new iterator

### Testing and fuzzing

* Add cargo-fuzz harness and initial fuzz targets (mrt_record, bgp_message, parser)
Expand All @@ -13,6 +23,7 @@ All notable changes to this project will be documented in this file.

* Add bounds checks throughout parsers to avoid overread/advance/split_to panics
* Handle invalid MRT BGP4MP_ET header length gracefully (reject ET records with on-wire length < 4)
* Use originated time instead of MRT header time for TableDumpV2 messages ([#252](https://github.com/bgpkit/bgpkit-parser/pull/252))

### Tooling and benchmarking

Expand Down
79 changes: 65 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ files.par_iter().for_each(|file| {

#### Choose the right data structure
- Use [MrtRecord] iteration for minimal memory overhead
- Use [MrtUpdate] for efficient batch processing without per-prefix attribute duplication
- Use [BgpElem] for easier per-prefix analysis
- See [Data Representation](#data-representation) for detailed comparison

Expand Down Expand Up @@ -508,7 +509,7 @@ bgpkit-parser -o 13335 -m a -4 updates.bz2

## Data Representation

BGPKIT Parser provides two ways to access parsed BGP data: [MrtRecord] and [BgpElem]. Choose based on your needs:
BGPKIT Parser provides three ways to access parsed BGP data: [MrtRecord], [MrtUpdate], and [BgpElem]. Choose based on your needs:

```
┌──────────────────────────────────────────────┐
Expand All @@ -518,15 +519,15 @@ BGPKIT Parser provides two ways to access parsed BGP data: [MrtRecord] and [BgpE
├──> Parser
┌─────────────────────────────┐
│ │
▼ ▼
[MrtRecord] [BgpElem]
(Low-level) (High-level)
└──────────────┬───────────────┘
┌──────────────┼────────────────┐
[MrtRecord] [MrtUpdate] [BgpElem]
(Low-level) (Intermediate) (High-level)
└─────────────┴────────────────┘
Your Analysis Code
```

Expand All @@ -546,6 +547,48 @@ See the [MrtRecord] documentation for the complete structure definition.

**Iteration**: Use [`BgpkitParser::into_record_iter()`] to iterate over [MrtRecord]s.

### [MrtUpdate]: Intermediate Message-Level Representation

[MrtUpdate] provides access to BGP announcements without expanding them into individual per-prefix elements. This is a middle ground between [MrtRecord] and [BgpElem]. Use this when you need:
- **Efficient batch processing**: Avoid duplicating attributes across prefixes
- **Message-level analysis**: Work with UPDATE messages or RIB entries as units
- **Memory efficiency**: Shared attributes aren't cloned for each prefix

**Supported message types** (via enum variants):
- `Bgp4MpUpdate`: BGP UPDATE messages from UPDATES files
- `TableDumpV2Entry`: RIB entries from TableDumpV2 RIB dumps
- `TableDumpMessage`: Legacy TableDump v1 messages

**Example**:
```rust
use bgpkit_parser::{BgpkitParser, MrtUpdate};

let parser = BgpkitParser::new("updates.mrt.bz2").unwrap();
for update in parser.into_update_iter() {
match update {
MrtUpdate::Bgp4MpUpdate(u) => {
// One UPDATE message may contain multiple prefixes sharing attributes
println!("Peer {} announced {} prefixes",
u.peer_ip,
u.message.announced_prefixes.len()
);
}
MrtUpdate::TableDumpV2Entry(e) => {
// One prefix with multiple RIB entries (one per peer)
println!("Prefix {} seen by {} peers",
e.prefix,
e.rib_entries.len()
);
}
MrtUpdate::TableDumpMessage(m) => {
println!("Legacy table dump for {}", m.prefix);
}
}
}
```

**Iteration**: Use [`BgpkitParser::into_update_iter()`] to iterate over [MrtUpdate]s.

### [BgpElem]: High-level Per-Prefix Representation

[BgpElem] provides a simplified, per-prefix view of BGP data. Each [BgpElem] represents a single prefix announcement or withdrawal. Use this when you want:
Expand Down Expand Up @@ -589,10 +632,18 @@ See the [BgpElem] documentation for the complete structure definition.

### Which One Should I Use?

- **Use [BgpElem]** (default): For most BGP analysis tasks, prefix tracking, AS path analysis
- **Use [MrtRecord]**: When you need MRT format details, re-encoding, or minimal memory overhead

**Memory trade-off**: [BgpElem] duplicates shared attributes (AS path, communities) for each prefix, consuming more memory but providing simpler analysis.
| Use Case | Recommended | Why |
|----------|-------------|-----|
| Simple prefix analysis | [BgpElem] | Easy per-prefix access, format-agnostic |
| High-performance processing | [MrtUpdate] | Avoids attribute duplication overhead |
| Counting prefixes per UPDATE | [MrtUpdate] | Direct access to message structure |
| Re-encoding MRT data | [MrtRecord] | Preserves complete MRT structure |
| MRT format-specific details | [MrtRecord] | Access to peer index tables, geo-location, etc. |

**Memory trade-off**:
- [BgpElem] duplicates shared attributes (AS path, communities) for each prefix
- [MrtUpdate] keeps attributes shared within each message/entry
- [MrtRecord] has minimal overhead but requires more code to extract BGP data

## RFCs Support

Expand Down
26 changes: 26 additions & 0 deletions benches/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ pub fn criterion_benchmark(c: &mut Criterion) {
})
});

c.bench_function("updates into_update_iter", |b| {
b.iter(|| {
let mut reader = black_box(&updates[..]);

BgpkitParser::from_reader(&mut reader)
.into_update_iter()
.take(RECORD_LIMIT)
.for_each(|x| {
black_box(x);
});
})
});

c.bench_function("rib into_record_iter", |b| {
b.iter(|| {
let mut reader = black_box(&rib_dump[..]);
Expand All @@ -103,6 +116,19 @@ pub fn criterion_benchmark(c: &mut Criterion) {
});
})
});

c.bench_function("rib into_update_iter", |b| {
b.iter(|| {
let mut reader = black_box(&rib_dump[..]);

BgpkitParser::from_reader(&mut reader)
.into_update_iter()
.take(RECORD_LIMIT)
.for_each(|x| {
black_box(x);
});
})
});
}

criterion_group! {
Expand Down
5 changes: 3 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ This directory contains runnable examples for bgpkit_parser. They demonstrate ba
## Quickstart and Iteration
- [parse_single_file.rs](parse_single_file.rs) — Download and iterate over a single RouteViews updates file, logging each BGP element (BgpElem).
- [parse_single_file_parallel.rs](parse_single_file_parallel.rs) — Parse a single compressed RIB in parallel using a raw iterator + worker pool. Downloads to current directory if remote, counts elems, and compares timing with a sequential run. Tunables via env vars: BATCH_SIZE, WORKERS, CHAN_CAP, ELEM_IN_WORKERS, QUIET_ERRORS.
- [display_elems.rs](display_elems.rs) — Print selected fields from each BGP element in a compact, pipe_delimited format.
- [display_elems.rs](display_elems.rs) — Print selected fields from each BGP element in a compact, pipe-delimited format.
- [count_elems.rs](count_elems.rs) — Count the total number of BGP elements in a given file.
- [records_iter.rs](records_iter.rs) — Iterate over raw MRT records and inspect/update messages; includes an example of detecting the Only_To_Customer (OTC) attribute.
- [scan_mrt.rs](scan_mrt.rs) — CLI_style scanner that quickly walks an MRT file, counting raw records, parsed records, or elements without processing them.
- [update_messages_iter.rs](update_messages_iter.rs) — Iterate over BGP announcements using the intermediate MrtUpdate representation; compares performance with BgpElem iteration and works with both UPDATES files and RIB dumps.
- [scan_mrt.rs](scan_mrt.rs) — CLI-style scanner that quickly walks an MRT file, counting raw records, parsed records, or elements without processing them.

## Filtering and Policy Examples
- [filters.rs](filters.rs) — Parse an MRT file and filter by a specific prefix (e.g., 211.98.251.0/24), logging matching announcements.
Expand Down
185 changes: 185 additions & 0 deletions examples/update_messages_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
//! Example demonstrating the use of `into_update_iter()` for processing BGP announcements.
//!
//! The `UpdateIterator` provides a middle ground between `RecordIterator` and `ElemIterator`:
//! - More focused than `RecordIterator` as it only yields BGP announcements
//! - More efficient than `ElemIterator` as it avoids duplicating attributes for each prefix
//!
//! This iterator handles both:
//! - **BGP4MP UPDATE messages** from UPDATES files (real-time updates)
//! - **TableDumpV2 RIB entries** from RIB dump files (routing table snapshots)
//!
//! This example compares the performance of `UpdateIterator` vs `ElemIterator` when counting
//! announced and withdrawn prefixes, and verifies that both approaches yield the same results.
//!
//! Run with: cargo run --example update_messages_iter --release

use bgpkit_parser::models::{AttributeValue, ElemType};
use bgpkit_parser::{BgpkitParser, MrtUpdate};
use std::time::Instant;

fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

// You can test with either an UPDATES file or a RIB dump file:
// UPDATES file (BGP4MP messages):
// let url = "https://archive.routeviews.org/bgpdata/2024.11/UPDATES/updates.20241101.0000.bz2";
// RIB dump file (TableDumpV2 messages):
let url = "https://archive.routeviews.org/bgpdata/2024.11/RIBS/rib.20241101.0000.bz2";

log::info!("Parsing MRT file: {}", url);
log::info!("");

// ========================================
// Method 1: Using UpdateIterator
// ========================================
log::info!("=== Method 1: UpdateIterator ===");

let start = Instant::now();
let parser = BgpkitParser::new(url).unwrap();

let mut bgp4mp_update_count = 0;
let mut rib_entry_count = 0;
let mut table_dump_v1_count = 0;
let mut update_iter_announced = 0;
let mut update_iter_withdrawn = 0;

for update in parser.into_update_iter() {
match update {
MrtUpdate::Bgp4MpUpdate(update) => {
bgp4mp_update_count += 1;

// Count announced prefixes (both from announced_prefixes and MP_REACH_NLRI)
let announced_count = update.message.announced_prefixes.len();
let mp_reach_count: usize = update
.message
.attributes
.iter()
.filter_map(|attr| {
if let AttributeValue::MpReachNlri(nlri) = attr {
Some(nlri.prefixes.len())
} else {
None
}
})
.sum();
update_iter_announced += announced_count + mp_reach_count;

// Count withdrawn prefixes (both from withdrawn_prefixes and MP_UNREACH_NLRI)
let withdrawn_count = update.message.withdrawn_prefixes.len();
let mp_unreach_count: usize = update
.message
.attributes
.iter()
.filter_map(|attr| {
if let AttributeValue::MpUnreachNlri(nlri) = attr {
Some(nlri.prefixes.len())
} else {
None
}
})
.sum();
update_iter_withdrawn += withdrawn_count + mp_unreach_count;
}
MrtUpdate::TableDumpV2Entry(entry) => {
rib_entry_count += 1;
// In TableDumpV2, each entry represents ONE prefix with multiple RIB entries (one per peer)
// Each RIB entry is an announcement of that prefix
update_iter_announced += entry.rib_entries.len();
}
MrtUpdate::TableDumpMessage(_msg) => {
table_dump_v1_count += 1;
// Legacy TableDump v1: one record = one prefix = one announcement
update_iter_announced += 1;
}
}
}

let update_iter_duration = start.elapsed();

log::info!("Message counts:");
log::info!(" - BGP4MP UPDATE messages: {}", bgp4mp_update_count);
log::info!(" - TableDumpV2 RIB entries: {}", rib_entry_count);
log::info!(" - TableDump v1 messages: {}", table_dump_v1_count);
log::info!("Total announced prefixes: {}", update_iter_announced);
log::info!("Total withdrawn prefixes: {}", update_iter_withdrawn);
log::info!("Time elapsed: {:?}", update_iter_duration);
log::info!("");

// ========================================
// Method 2: Using ElemIterator
// ========================================
log::info!("=== Method 2: ElemIterator ===");

let start = Instant::now();
let parser = BgpkitParser::new(url).unwrap();

let mut elem_count = 0;
let mut elem_iter_announced = 0;
let mut elem_iter_withdrawn = 0;

for elem in parser.into_elem_iter() {
elem_count += 1;

match elem.elem_type {
ElemType::ANNOUNCE => elem_iter_announced += 1,
ElemType::WITHDRAW => elem_iter_withdrawn += 1,
}
}

let elem_iter_duration = start.elapsed();

log::info!("Total BGP elements: {}", elem_count);
log::info!("Total announced prefixes: {}", elem_iter_announced);
log::info!("Total withdrawn prefixes: {}", elem_iter_withdrawn);
log::info!("Time elapsed: {:?}", elem_iter_duration);
log::info!("");

// ========================================
// Comparison
// ========================================
log::info!("=== Comparison ===");

let announced_match = update_iter_announced == elem_iter_announced;
let withdrawn_match = update_iter_withdrawn == elem_iter_withdrawn;

log::info!(
"Announced prefixes match: {} (UpdateIter: {}, ElemIter: {})",
if announced_match { "✓" } else { "✗" },
update_iter_announced,
elem_iter_announced
);
log::info!(
"Withdrawn prefixes match: {} (UpdateIter: {}, ElemIter: {})",
if withdrawn_match { "✓" } else { "✗" },
update_iter_withdrawn,
elem_iter_withdrawn
);

if update_iter_duration.as_nanos() > 0 {
let speedup = elem_iter_duration.as_secs_f64() / update_iter_duration.as_secs_f64();
log::info!(
"Performance: UpdateIterator is {:.2}x {} than ElemIterator",
if speedup >= 1.0 {
speedup
} else {
1.0 / speedup
},
if speedup >= 1.0 { "faster" } else { "slower" }
);
}
log::info!(" - UpdateIterator: {:?}", update_iter_duration);
log::info!(" - ElemIterator: {:?}", elem_iter_duration);

// Assert counts match
assert_eq!(
update_iter_announced, elem_iter_announced,
"Announced prefix counts should match!"
);
assert_eq!(
update_iter_withdrawn, elem_iter_withdrawn,
"Withdrawn prefix counts should match!"
);

log::info!("");
log::info!("All counts verified successfully!");
}
Loading