diff --git a/crates/audit/src/archiver.rs b/crates/audit/src/archiver.rs index 80f038a..76fe795 100644 --- a/crates/audit/src/archiver.rs +++ b/crates/audit/src/archiver.rs @@ -1,6 +1,11 @@ -use crate::metrics::Metrics; -use crate::reader::EventReader; -use crate::storage::EventWriter; +use crate::metrics::{ + EventType, decrement_in_flight_archive_tasks, increment_events_processed, + increment_failed_archive_tasks, increment_in_flight_archive_tasks, + record_archive_event_duration, record_event_age, record_kafka_commit_duration, + record_kafka_read_duration, +}; +use crate::reader::{EventReader, UserOpEventReader}; +use crate::storage::{EventWriter, UserOpEventWriter}; use anyhow::Result; use std::fmt; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -15,7 +20,6 @@ where { reader: R, writer: W, - metrics: Metrics, } impl fmt::Debug for KafkaAuditArchiver @@ -35,11 +39,7 @@ where { /// Creates a new archiver with the given reader and writer. pub fn new(reader: R, writer: W) -> Self { - Self { - reader, - writer, - metrics: Metrics::default(), - } + Self { reader, writer } } /// Runs the archiver loop, reading events and writing them to storage. @@ -50,42 +50,35 @@ where let read_start = Instant::now(); match self.reader.read_event().await { Ok(event) => { - self.metrics - .kafka_read_duration - .record(read_start.elapsed().as_secs_f64()); + record_kafka_read_duration(read_start.elapsed(), EventType::Bundle); let now_ms = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as i64; let event_age_ms = now_ms.saturating_sub(event.timestamp); - self.metrics.event_age.record(event_age_ms as f64); + record_event_age(event_age_ms as f64, EventType::Bundle); // TODO: the integration test breaks because Minio doesn't support etag let writer = self.writer.clone(); - let metrics = self.metrics.clone(); - self.metrics.in_flight_archive_tasks.increment(1.0); + increment_in_flight_archive_tasks(EventType::Bundle); tokio::spawn(async move { let archive_start = Instant::now(); if let Err(e) = writer.archive_event(event).await { error!(error = %e, "Failed to write event"); - metrics.failed_archive_tasks.increment(1); + increment_failed_archive_tasks(EventType::Bundle); } else { - metrics - .archive_event_duration - .record(archive_start.elapsed().as_secs_f64()); - metrics.events_processed.increment(1); + record_archive_event_duration(archive_start.elapsed(), EventType::Bundle); + increment_events_processed(EventType::Bundle); } - metrics.in_flight_archive_tasks.decrement(1.0); + decrement_in_flight_archive_tasks(EventType::Bundle); }); let commit_start = Instant::now(); if let Err(e) = self.reader.commit().await { error!(error = %e, "Failed to commit message"); } - self.metrics - .kafka_commit_duration - .record(commit_start.elapsed().as_secs_f64()); + record_kafka_commit_duration(commit_start.elapsed(), EventType::Bundle); } Err(e) => { error!(error = %e, "Error reading events"); @@ -95,3 +88,66 @@ where } } } + +pub struct KafkaUserOpAuditArchiver +where + R: UserOpEventReader, + W: UserOpEventWriter + Clone + Send + 'static, +{ + reader: R, + writer: W, +} + +impl KafkaUserOpAuditArchiver +where + R: UserOpEventReader, + W: UserOpEventWriter + Clone + Send + 'static, +{ + pub fn new(reader: R, writer: W) -> Self { + Self { reader, writer } + } + + pub async fn run(&mut self) -> Result<()> { + info!("Starting Kafka UserOp archiver"); + + loop { + let read_start = Instant::now(); + match self.reader.read_event().await { + Ok(event) => { + record_kafka_read_duration(read_start.elapsed(), EventType::UserOp); + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64; + let event_age_ms = now_ms.saturating_sub(event.timestamp); + record_event_age(event_age_ms as f64, EventType::UserOp); + + let writer = self.writer.clone(); + increment_in_flight_archive_tasks(EventType::UserOp); + tokio::spawn(async move { + let archive_start = Instant::now(); + if let Err(e) = writer.archive_userop_event(event).await { + error!(error = %e, "Failed to write UserOp event"); + increment_failed_archive_tasks(EventType::UserOp); + } else { + record_archive_event_duration(archive_start.elapsed(), EventType::UserOp); + increment_events_processed(EventType::UserOp); + } + decrement_in_flight_archive_tasks(EventType::UserOp); + }); + + let commit_start = Instant::now(); + if let Err(e) = self.reader.commit().await { + error!(error = %e, "Failed to commit message"); + } + record_kafka_commit_duration(commit_start.elapsed(), EventType::UserOp); + } + Err(e) => { + error!(error = %e, "Error reading UserOp events"); + sleep(Duration::from_secs(1)).await; + } + } + } + } +} diff --git a/crates/audit/src/metrics.rs b/crates/audit/src/metrics.rs index 906de8e..cad3cc3 100644 --- a/crates/audit/src/metrics.rs +++ b/crates/audit/src/metrics.rs @@ -1,26 +1,65 @@ use metrics::{Counter, Gauge, Histogram}; use metrics_derive::Metrics; +use std::time::Duration; -/// Metrics for audit operations including Kafka reads, S3 writes, and event processing. -#[derive(Metrics, Clone)] -#[metrics(scope = "tips_audit")] -pub struct Metrics { - /// Duration of archive_event operations. - #[metric(describe = "Duration of archive_event")] - pub archive_event_duration: Histogram, +/// Event type tag for metrics differentiation +#[derive(Clone, Copy)] +pub enum EventType { + Bundle, + UserOp, +} + +impl EventType { + pub fn as_str(&self) -> &'static str { + match self { + EventType::Bundle => "bundle", + EventType::UserOp => "userop", + } + } +} - /// Age of event when processed (now - event timestamp). - #[metric(describe = "Age of event when processed (now - event timestamp)")] - pub event_age: Histogram, +pub fn record_archive_event_duration(duration: Duration, event_type: EventType) { + metrics::histogram!("tips_audit_archive_event_duration", "type" => event_type.as_str()) + .record(duration.as_secs_f64()); +} + +pub fn record_event_age(age_ms: f64, event_type: EventType) { + metrics::histogram!("tips_audit_event_age", "type" => event_type.as_str()).record(age_ms); +} - /// Duration of Kafka read_event operations. - #[metric(describe = "Duration of Kafka read_event")] - pub kafka_read_duration: Histogram, +pub fn record_kafka_read_duration(duration: Duration, event_type: EventType) { + metrics::histogram!("tips_audit_kafka_read_duration", "type" => event_type.as_str()) + .record(duration.as_secs_f64()); +} - /// Duration of Kafka commit operations. - #[metric(describe = "Duration of Kafka commit")] - pub kafka_commit_duration: Histogram, +pub fn record_kafka_commit_duration(duration: Duration, event_type: EventType) { + metrics::histogram!("tips_audit_kafka_commit_duration", "type" => event_type.as_str()) + .record(duration.as_secs_f64()); +} + +pub fn increment_events_processed(event_type: EventType) { + metrics::counter!("tips_audit_events_processed", "type" => event_type.as_str()).increment(1); +} +pub fn increment_in_flight_archive_tasks(event_type: EventType) { + metrics::gauge!("tips_audit_in_flight_archive_tasks", "type" => event_type.as_str()) + .increment(1.0); +} + +pub fn decrement_in_flight_archive_tasks(event_type: EventType) { + metrics::gauge!("tips_audit_in_flight_archive_tasks", "type" => event_type.as_str()) + .decrement(1.0); +} + +pub fn increment_failed_archive_tasks(event_type: EventType) { + metrics::counter!("tips_audit_failed_archive_tasks", "type" => event_type.as_str()) + .increment(1); +} + +/// Metrics for audit operations including Kafka reads, S3 writes, and event processing. +#[derive(Metrics, Clone)] +#[metrics(scope = "tips_audit")] +pub struct Metrics { /// Duration of update_bundle_history operations. #[metric(describe = "Duration of update_bundle_history")] pub update_bundle_history_duration: Histogram, @@ -37,19 +76,7 @@ pub struct Metrics { #[metric(describe = "Duration of S3 put_object")] pub s3_put_duration: Histogram, - /// Total events processed. - #[metric(describe = "Total events processed")] - pub events_processed: Counter, - /// Total S3 writes skipped due to deduplication. #[metric(describe = "Total S3 writes skipped due to dedup")] pub s3_writes_skipped: Counter, - - /// Number of in-flight archive tasks. - #[metric(describe = "Number of in-flight archive tasks")] - pub in_flight_archive_tasks: Gauge, - - /// Number of failed archive tasks. - #[metric(describe = "Number of failed archive tasks")] - pub failed_archive_tasks: Counter, } diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index f2bb9d3..0e86d45 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -1,13 +1,14 @@ -use alloy_primitives::{Address, B256, U256}; +use alloy_primitives::{Address, B256, TxHash, U256}; use std::time::Duration; use tips_audit_lib::{ - KafkaAuditArchiver, KafkaAuditLogReader, KafkaUserOpAuditLogReader, UserOpEventReader, + KafkaAuditArchiver, KafkaAuditLogReader, KafkaUserOpAuditArchiver, KafkaUserOpAuditLogReader, + UserOpEventReader, publisher::{ BundleEventPublisher, KafkaBundleEventPublisher, KafkaUserOpEventPublisher, UserOpEventPublisher, }, - storage::{BundleEventS3Reader, S3EventReaderWriter}, - types::{BundleEvent, DropReason, UserOpEvent}, + storage::{BundleEventS3Reader, S3EventReaderWriter, UserOpEventS3Reader, UserOpEventWriter}, + types::{BundleEvent, DropReason, UserOpDropReason, UserOpEvent}, }; use tips_core::test_utils::create_bundle_from_txn_data; use uuid::Uuid; @@ -123,3 +124,387 @@ async fn test_userop_kafka_publisher_reader_integration() Ok(()) } + +#[tokio::test] +#[ignore = "TODO doesn't appear to work with minio, should test against a real S3 bucket"] +async fn test_userop_kafka_publisher_s3_archiver_integration() +-> Result<(), Box> { + let harness = TestHarness::new().await?; + let topic = "test-userop-audit-events"; + + let s3_writer = + S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); + + let test_user_op_hash = B256::from_slice(&[1u8; 32]); + let test_sender = Address::from_slice(&[2u8; 20]); + let test_entry_point = Address::from_slice(&[3u8; 20]); + let test_nonce = U256::from(42); + + let test_event = UserOpEvent::AddedToMempool { + user_op_hash: test_user_op_hash, + sender: test_sender, + entry_point: test_entry_point, + nonce: test_nonce, + }; + + let publisher = KafkaUserOpEventPublisher::new(harness.kafka_producer, topic.to_string()); + publisher.publish(test_event.clone()).await?; + + let mut archiver = KafkaUserOpAuditArchiver::new( + KafkaUserOpAuditLogReader::new(harness.kafka_consumer, topic.to_string())?, + s3_writer.clone(), + ); + + tokio::spawn(async move { + archiver.run().await.expect("error running archiver"); + }); + + let mut counter = 0; + loop { + counter += 1; + if counter > 10 { + panic!("unable to complete archiving within the deadline"); + } + + tokio::time::sleep(Duration::from_secs(1)).await; + let history = s3_writer.get_userop_history(test_user_op_hash).await?; + + if let Some(h) = history { + if !h.history.is_empty() { + assert_eq!(h.history.len(), 1); + break; + } + } + } + + Ok(()) +} + +#[tokio::test] +async fn test_userop_end_to_end_single_event() +-> Result<(), Box> { + let harness = TestHarness::new().await?; + let topic = "test-e2e-single"; + + let s3_writer = + S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); + + let test_user_op_hash = B256::from_slice(&[10u8; 32]); + let test_sender = Address::from_slice(&[11u8; 20]); + let test_entry_point = Address::from_slice(&[12u8; 20]); + let test_nonce = U256::from(100); + + let test_event = UserOpEvent::AddedToMempool { + user_op_hash: test_user_op_hash, + sender: test_sender, + entry_point: test_entry_point, + nonce: test_nonce, + }; + + let publisher = KafkaUserOpEventPublisher::new(harness.kafka_producer, topic.to_string()); + publisher.publish(test_event.clone()).await?; + + let mut reader = KafkaUserOpAuditLogReader::new(harness.kafka_consumer, topic.to_string())?; + let received = tokio::time::timeout(Duration::from_secs(10), reader.read_event()).await??; + + s3_writer.archive_userop_event(received).await?; + + let history = s3_writer.get_userop_history(test_user_op_hash).await?; + assert!(history.is_some(), "History should exist after archiving"); + + let h = history.unwrap(); + assert_eq!(h.history.len(), 1, "Should have exactly one event"); + + reader.commit().await?; + Ok(()) +} + +#[tokio::test] +async fn test_userop_end_to_end_multiple_events_same_userop() +-> Result<(), Box> { + let harness = TestHarness::new().await?; + let topic = "test-e2e-multiple"; + + let s3_writer = + S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); + + let test_user_op_hash = B256::from_slice(&[20u8; 32]); + let test_sender = Address::from_slice(&[21u8; 20]); + let test_entry_point = Address::from_slice(&[22u8; 20]); + + let events = vec![ + UserOpEvent::AddedToMempool { + user_op_hash: test_user_op_hash, + sender: test_sender, + entry_point: test_entry_point, + nonce: U256::from(1), + }, + UserOpEvent::Included { + user_op_hash: test_user_op_hash, + block_number: 12345, + tx_hash: TxHash::from_slice(&[99u8; 32]), + }, + ]; + + let publisher = KafkaUserOpEventPublisher::new(harness.kafka_producer, topic.to_string()); + for event in &events { + publisher.publish(event.clone()).await?; + } + + let mut reader = KafkaUserOpAuditLogReader::new(harness.kafka_consumer, topic.to_string())?; + + for _ in 0..events.len() { + let received = tokio::time::timeout(Duration::from_secs(10), reader.read_event()).await??; + s3_writer.archive_userop_event(received).await?; + reader.commit().await?; + } + + let history = s3_writer.get_userop_history(test_user_op_hash).await?; + assert!(history.is_some()); + + let h = history.unwrap(); + assert_eq!(h.history.len(), 2, "Should have two events in history"); + + Ok(()) +} + +#[tokio::test] +async fn test_userop_end_to_end_kafka_redelivery_deduplication() +-> Result<(), Box> { + use tips_audit_lib::storage::UserOpEventWrapper; + + let harness = TestHarness::new().await?; + + let s3_writer = + S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); + + let test_user_op_hash = B256::from_slice(&[30u8; 32]); + let test_sender = Address::from_slice(&[31u8; 20]); + let test_entry_point = Address::from_slice(&[32u8; 20]); + + let test_event = UserOpEvent::AddedToMempool { + user_op_hash: test_user_op_hash, + sender: test_sender, + entry_point: test_entry_point, + nonce: U256::from(1), + }; + + let same_key = "same-key-for-redelivery".to_string(); + let wrapped1 = UserOpEventWrapper { + key: same_key.clone(), + event: test_event.clone(), + timestamp: 1000, + }; + let wrapped2 = UserOpEventWrapper { + key: same_key.clone(), + event: test_event.clone(), + timestamp: 2000, + }; + + s3_writer.archive_userop_event(wrapped1).await?; + s3_writer.archive_userop_event(wrapped2).await?; + + let history = s3_writer.get_userop_history(test_user_op_hash).await?; + assert!(history.is_some()); + + let h = history.unwrap(); + assert_eq!( + h.history.len(), + 1, + "Kafka redelivery (same key) should be deduplicated, expected 1 but got {}", + h.history.len() + ); + + Ok(()) +} + +#[tokio::test] +async fn test_userop_end_to_end_all_event_types() +-> Result<(), Box> { + let harness = TestHarness::new().await?; + let topic = "test-e2e-all-types"; + + let s3_writer = + S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); + + let test_user_op_hash = B256::from_slice(&[40u8; 32]); + let test_sender = Address::from_slice(&[41u8; 20]); + let test_entry_point = Address::from_slice(&[42u8; 20]); + + let events = vec![ + UserOpEvent::AddedToMempool { + user_op_hash: test_user_op_hash, + sender: test_sender, + entry_point: test_entry_point, + nonce: U256::from(1), + }, + UserOpEvent::Dropped { + user_op_hash: test_user_op_hash, + reason: UserOpDropReason::Expired, + }, + ]; + + let publisher = KafkaUserOpEventPublisher::new(harness.kafka_producer, topic.to_string()); + for event in &events { + publisher.publish(event.clone()).await?; + } + + let mut reader = KafkaUserOpAuditLogReader::new(harness.kafka_consumer, topic.to_string())?; + + for _ in 0..events.len() { + let received = tokio::time::timeout(Duration::from_secs(10), reader.read_event()).await??; + s3_writer.archive_userop_event(received).await?; + reader.commit().await?; + } + + let history = s3_writer.get_userop_history(test_user_op_hash).await?; + assert!(history.is_some()); + + let h = history.unwrap(); + assert_eq!(h.history.len(), 2, "Should have both event types"); + + Ok(()) +} + +#[tokio::test] +async fn test_userop_end_to_end_dropped_with_reason() +-> Result<(), Box> { + let harness = TestHarness::new().await?; + let topic = "test-e2e-dropped"; + + let s3_writer = + S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); + + let test_user_op_hash = B256::from_slice(&[50u8; 32]); + + let test_event = UserOpEvent::Dropped { + user_op_hash: test_user_op_hash, + reason: UserOpDropReason::Invalid("gas too low".to_string()), + }; + + let publisher = KafkaUserOpEventPublisher::new(harness.kafka_producer, topic.to_string()); + publisher.publish(test_event.clone()).await?; + + let mut reader = KafkaUserOpAuditLogReader::new(harness.kafka_consumer, topic.to_string())?; + let received = tokio::time::timeout(Duration::from_secs(10), reader.read_event()).await??; + + match &received.event { + UserOpEvent::Dropped { reason, .. } => match reason { + UserOpDropReason::Invalid(msg) => { + assert_eq!(msg, "gas too low"); + } + _ => panic!("Expected Invalid reason"), + }, + _ => panic!("Expected Dropped event"), + } + + s3_writer.archive_userop_event(received).await?; + + let history = s3_writer.get_userop_history(test_user_op_hash).await?; + assert!(history.is_some()); + assert_eq!(history.unwrap().history.len(), 1); + + reader.commit().await?; + Ok(()) +} + +#[tokio::test] +async fn test_userop_end_to_end_included_event() +-> Result<(), Box> { + let harness = TestHarness::new().await?; + let topic = "test-e2e-included"; + + let s3_writer = + S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); + + let test_user_op_hash = B256::from_slice(&[60u8; 32]); + let test_tx_hash = TxHash::from_slice(&[61u8; 32]); + let test_block_number = 999999u64; + + let test_event = UserOpEvent::Included { + user_op_hash: test_user_op_hash, + block_number: test_block_number, + tx_hash: test_tx_hash, + }; + + let publisher = KafkaUserOpEventPublisher::new(harness.kafka_producer, topic.to_string()); + publisher.publish(test_event.clone()).await?; + + let mut reader = KafkaUserOpAuditLogReader::new(harness.kafka_consumer, topic.to_string())?; + let received = tokio::time::timeout(Duration::from_secs(10), reader.read_event()).await??; + + match &received.event { + UserOpEvent::Included { + block_number, + tx_hash, + .. + } => { + assert_eq!(*block_number, test_block_number); + assert_eq!(*tx_hash, test_tx_hash); + } + _ => panic!("Expected Included event"), + } + + s3_writer.archive_userop_event(received).await?; + + let history = s3_writer.get_userop_history(test_user_op_hash).await?; + assert!(history.is_some()); + assert_eq!(history.unwrap().history.len(), 1); + + reader.commit().await?; + Ok(()) +} + +#[tokio::test] +async fn test_userop_end_to_end_full_lifecycle() +-> Result<(), Box> { + let harness = TestHarness::new().await?; + let topic = "test-e2e-lifecycle"; + + let s3_writer = + S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); + + let test_user_op_hash = B256::from_slice(&[70u8; 32]); + let test_sender = Address::from_slice(&[71u8; 20]); + let test_entry_point = Address::from_slice(&[72u8; 20]); + let test_tx_hash = TxHash::from_slice(&[73u8; 32]); + + let lifecycle_events = vec![ + UserOpEvent::AddedToMempool { + user_op_hash: test_user_op_hash, + sender: test_sender, + entry_point: test_entry_point, + nonce: U256::from(1), + }, + UserOpEvent::Included { + user_op_hash: test_user_op_hash, + block_number: 12345, + tx_hash: test_tx_hash, + }, + ]; + + let publisher = KafkaUserOpEventPublisher::new(harness.kafka_producer, topic.to_string()); + for event in &lifecycle_events { + publisher.publish(event.clone()).await?; + } + + let mut reader = KafkaUserOpAuditLogReader::new(harness.kafka_consumer, topic.to_string())?; + + for _ in 0..lifecycle_events.len() { + let received = tokio::time::timeout(Duration::from_secs(10), reader.read_event()).await??; + s3_writer.archive_userop_event(received).await?; + reader.commit().await?; + } + + let history = s3_writer.get_userop_history(test_user_op_hash).await?; + assert!(history.is_some()); + + let h = history.unwrap(); + assert_eq!( + h.history.len(), + 2, + "Full lifecycle should have 2 events (AddedToMempool, Included)" + ); + + Ok(()) +}