Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

Copy link
Contributor Author

MasterPtato commented Jan 23, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@MasterPtato MasterPtato force-pushed the 01-22-chore_clean_up_pools_remove_unused_crates branch from 408d468 to 92a7f8a Compare January 23, 2026 01:09
@MasterPtato MasterPtato force-pushed the 01-22-do_not_merge_add_metrics_exporter_for_ee branch from 65c7cbd to b30c70d Compare January 23, 2026 01:09
@claude
Copy link

claude bot commented Jan 23, 2026

PR Review: Metrics Exporter for EE

Summary: This PR adds Kafka-based metrics export functionality for enterprise edition, allowing usage metrics to be exported periodically from the namespace to Kafka.

Code Quality & Best Practices

Strengths:

  • Well-structured workflow using the Gasoline workflow engine with proper activity separation
  • Good use of pagination for large metric sets to avoid transaction timeouts (2.5s early timeout)
  • Proper jitter implementation to prevent stampeding herds
  • Follows existing codebase patterns for database transactions and subspace usage

Issues Found:

  1. Hardcoded TODO in production code (metrics_exporter.rs:122)

    • project: "TODO".to_string() should be replaced with actual project identification
    • This will cause incorrect data in production
  2. Missing Display trait implementation (metric.rs:92-107)

    • MetricVariant::fmt should use strum::Display derive instead of manual implementation
    • Current approach is error-prone and doesn't leverage the existing strum dependency
  3. Inconsistent error handling (kafka.rs:14-15)

    • Reading secrets with .read() without documenting Secret type behavior
    • No error handling for invalid PEM or connection failures
  4. Documentation gaps

    • Missing module-level documentation for metrics_exporter.rs
    • No comments explaining the 1024-row chunking decision
    • EARLY_TXN_TIMEOUT lacks explanation of why 2.5 seconds was chosen

Performance Considerations

Good:

  • Chunking into 1024 rows prevents overwhelming Kafka
  • Using buffer_unordered(1024) for concurrent Kafka sends
  • Transaction timeout prevents long-running database locks
  • Proper use of streaming for large metric sets

Concerns:

  1. Unbounded metrics collection (metrics_exporter.rs:47-110)

    • The workflow collects all metrics into memory before sending
    • Could cause OOM for namespaces with millions of metrics
    • Consider streaming directly to Kafka instead of buffering
  2. No backpressure mechanism

    • If Kafka is slow/down, the workflow will fail and retry
    • No exponential backoff or circuit breaker pattern
  3. Missing metric cleanup

    • Metrics are exported but never cleared from the database
    • Will cause unbounded growth unless metrics are reset elsewhere

Security Concerns

  1. Secret handling (kafka.rs:14-15)

    • Passing secrets to rdkafka via string; ensure rdkafka doesn't log these
    • CA PEM is passed as string - verify rdkafka doesn't leak this in debug output
  2. No validation on Kafka topic name

    • USAGE_METRICS_TOPIC is hardcoded but not validated in config
    • Topic must exist or producer will fail silently

Test Coverage

Critical Missing Tests:

  • No tests for the metrics exporter workflow
  • No tests for Kafka connection/failure scenarios
  • No tests for metric chunking logic
  • No tests for the pagination/timeout logic

Recommended test coverage:

  • Unit tests for UsageMetricsRow::serialize_compact()
  • Integration tests for the export workflow with mock Kafka
  • Tests for transaction timeout handling
  • Tests for metric collection pagination

Architecture & Design

Concerns:

  1. Tight coupling (lib.rs:795-815)

    • Gateway code directly dispatches metrics exporter workflow
    • This workflow dispatch happens on every HTTP/WebSocket request
    • The .unique() call prevents duplicates, but still requires a database lookup each time
    • Consider triggering this once per namespace lifecycle instead
  2. Workflow uniqueness

    • Uses .unique() without .idempotent() or error handling
    • What happens if workflow is already running? Silent success?
  3. Timestamp precision (metrics_exporter.rs:124)

    • Comment says "Nanoseconds" but code is now * 1000 (microseconds if now is milliseconds)
    • Inconsistent with the comment; needs clarification

Specific Recommendations

  1. Fix the TODO immediately - this is blocking merge readiness
  2. Add exponential backoff for Kafka failures
  3. Add cleanup logic to delete exported metrics from the database
  4. Move workflow dispatch to namespace creation, not per-request
  5. Add comprehensive tests before merging
  6. Document the timestamp precision correctly
  7. Consider memory-efficient streaming instead of collecting all metrics

Additional Notes

  • The PR title says "DO NOT MERGE" - please update status when ready
  • Config schema properly updated with validation
  • Good adherence to repository error handling patterns with RivetError derive
  • Kafka dependency properly added to workspace

Verdict: This PR has good structure but needs significant work before merge:

  • ✅ Code structure and patterns
  • ❌ Production readiness (TODO in code)
  • ❌ Test coverage
  • ⚠️ Performance (memory concerns)
  • ⚠️ Architecture (per-request workflow dispatch)

Please address the TODO, add tests, and consider the architecture feedback before removing "DO NOT MERGE".

@claude
Copy link

claude bot commented Jan 23, 2026

PR Review: Metrics Exporter for EE

This PR adds a Kafka-based metrics exporter for enterprise edition. Overall, the implementation is well-structured, but there are several issues to address before merging.


Critical Issues

1. TODO placeholder in production code (engine/packages/namespace/src/workflows/metrics_exporter.rs:122)

project: "TODO".to_string(),

The project field is hardcoded to "TODO". This should either:

  • Be derived from the namespace or config
  • Be passed as part of the workflow input
  • Have a clear plan for how it will be populated

2. Missing ctx.kafka() method implementation

The PR uses ctx.kafka()? in the activity (line 140), but I don't see where this method is added to ActivityCtx. You'll need to:

  • Add a kafka() method to the appropriate context trait
  • Ensure it returns Result<rdkafka::producer::FutureProducer>
  • Follow the pattern used by ctx.udb()? and ctx.config()

3. Potential memory issue with unbounded metrics collection (metrics_exporter.rs:47-111)

The code collects all metrics into a Vec before processing:

let mut metrics = Vec::new();
// ... loop that extends metrics
metrics.extend(new_metrics);

For namespaces with many actors/high activity, this could consume significant memory. Consider:

  • Processing and sending metrics in batches as you collect them
  • Adding a limit on total metrics per export cycle
  • Adding memory usage monitoring/logging

Bugs & Logic Issues

4. Empty payload sent to Kafka (metrics_exporter.rs:116-138)

When payloads is initialized with vec\![String::new()], if there are no metrics, you'll send an empty string to Kafka. Add a check:

// Remove empty payloads before sending
let payloads: Vec<_> = payloads.into_iter().filter(|p| \!p.is_empty()).collect();
if payloads.is_empty() {
    return Ok(());
}

5. Timestamp units mismatch (metrics_exporter.rs:166-167)

Comment says "Nanoseconds" but the code does:

/// Nanoseconds.
pub timestamp: i64,
// ...
timestamp: now * 1000,  // now is milliseconds, * 1000 = microseconds, not nanoseconds

If ClickHouse expects nanoseconds, multiply by 1,000,000 instead of 1,000. If it expects microseconds, update the comment. Verify the expected format.

6. Race condition in workflow initialization (pegboard-gateway/src/lib.rs:164, 294)

The metrics exporter workflow is initialized on every HTTP and WebSocket request via init_ns_metrics_exporter. While .unique() prevents duplicates, this creates unnecessary database queries on every request. Consider:

  • Initializing the workflow once when the namespace is created
  • Caching whether the workflow has been dispatched
  • Moving initialization to a namespace lifecycle event

Performance Considerations

7. Buffer size could be tuned (metrics_exporter.rs:156)

buffer_unordered(1024) processes up to 1024 Kafka sends concurrently. This might overwhelm the Kafka producer. Consider:

  • Testing with different buffer sizes under load
  • Using a configurable value
  • Monitoring Kafka producer metrics for backpressure

8. Missing transaction timeout handling (metrics_exporter.rs:81-83)

The early timeout warning logs but continues to process incomplete data:

if start.elapsed() > EARLY_TXN_TIMEOUT {
    tracing::warn\!("timed out processing pending actors metrics");
    break;  // Breaks inner loop, continues with partial data
}

This could lead to:

  • Inconsistent metric exports (some actors included, others not)
  • Silent data loss

Consider retrying the transaction or returning an error to trigger workflow retry.


Code Quality

9. Inconsistent error logging (metrics_exporter.rs:82)

Following CLAUDE.md logging conventions, this should include structured context:

tracing::warn\!(
    ?namespace_id,
    elapsed_ms = start.elapsed().as_millis(),
    "timed out processing metrics"
);

10. Magic number 1024 (metrics_exporter.rs:134)

The chunking size 1024 appears in multiple places. Extract to a constant:

const METRICS_BATCH_SIZE: usize = 1024;

11. Missing metric name serialization (keys/metric.rs:78-107)

MetricVariant is now pub enum but the Display implementation might be more maintainable using strum::Display derive instead of manual implementation:

#[derive(strum::FromRepr, strum::Display)]
#[strum(serialize_all = "snake_case")]
pub enum MetricVariant {
    ActorAwake,
    // ...
}

Security

12. Kafka credentials handling (pools/src/db/kafka.rs:14-15)

Good use of Secret<String> type and .read() method. Ensure these values aren't logged. The #[tracing::instrument(skip(config))] is correct.

13. SSL configuration (pools/src/db/kafka.rs:11-15)

The SSL configuration looks correct with SASL_SSL and certificate validation. Verify that:

  • The CA certificate is properly validated (rdkafka should do this by default)
  • Certificate verification isn't accidentally disabled elsewhere

Testing

14. Missing tests

No test files are included in this PR. Consider adding:

  • Unit tests for UsageMetricsRow::serialize_compact()
  • Integration tests for the metrics exporter workflow
  • Tests for edge cases (empty metrics, timeout scenarios, Kafka failures)

Documentation

15. Missing workflow documentation

Add a module-level comment explaining:

  • When this workflow runs (every minute per namespace)
  • What happens if Kafka is unavailable
  • How metrics are aggregated/reset
  • The expected ClickHouse schema

16. Config schema documentation (config/kafka.rs:7-14)

Add doc comments for the config fields:

/// Kafka configuration for metrics export
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
pub struct Kafka {
    /// Kafka broker URL (e.g., "kafka.example.com:9092")
    pub url: Url,
    // ...
}

Dependencies

17. Dependency versioning (Cargo.toml:138-140)

Good practice using workspace dependencies. The rdkafka SSL feature is correctly enabled.

18. Missing rdkafka optional dependency handling

Since Kafka is optional in config, should rdkafka be an optional dependency? This would reduce binary size when not using the metrics exporter:

[dependencies]
rdkafka = { workspace = true, optional = true }

[features]
metrics-export = ["rdkafka"]

Summary

Priority fixes before merge:

  1. ✅ Resolve the TODO placeholder
  2. ✅ Implement ctx.kafka() method
  3. ✅ Fix timestamp units mismatch
  4. ✅ Handle empty payloads
  5. ✅ Address memory concerns with unbounded metrics collection

Recommended improvements:

  • Add tests
  • Optimize workflow initialization (avoid per-request dispatch)
  • Improve error handling for transaction timeouts
  • Add documentation

Minor polish:

  • Extract magic numbers to constants
  • Improve structured logging
  • Consider optional dependency compilation

The core architecture is sound - using workflows for periodic exports and batching for Kafka is the right approach. Once the critical issues are addressed, this should work well.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Jan 23, 2026

More templates

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@4014

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@4014

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@4014

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@4014

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@4014

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@4014

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@4014

@rivetkit/virtual-websocket

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/virtual-websocket@4014

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@4014

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@4014

commit: 22c21f0

@claude
Copy link

claude bot commented Jan 23, 2026

Code Review: Metrics Exporter for Enterprise Edition

Overview

This PR adds a Kafka-based metrics exporter for collecting and transmitting usage metrics. The implementation follows good architectural patterns but has several areas that need attention before merging.


Critical Issues

1. Hardcoded TODO Value (engine/packages/namespace/src/workflows/metrics_exporter.rs:127)

project: "TODO".to_string(),

Issue: The project field is hardcoded to "TODO", which will cause incorrect data in production.

Recommendation: Determine where the project identifier should come from (config, namespace metadata, etc.) and implement proper resolution. This should be resolved before merge.


Security Concerns

2. Sensitive Credentials in Logs

Location: engine/packages/pools/src/db/kafka.rs:7

The tracing statement tracing::debug\!("kafka connecting") is fine, but ensure no secrets are logged in error cases. The rdkafka library may include connection details in error messages.

Recommendation: Verify error handling doesn't leak credentials and consider wrapping the connection setup with explicit error sanitization.


Code Quality Issues

3. Missing Display Trait Derivation

Location: engine/packages/namespace/src/keys/metric.rs:78

MetricVariant implements std::fmt::Display manually, but it should also derive strum::Display for consistency since it already uses strum::FromRepr.

Current:

#[derive(strum::FromRepr)]
pub enum MetricVariant { ... }

impl std::fmt::Display for MetricVariant { ... }

Recommendation:

#[derive(strum::FromRepr, strum::Display)]
#[strum(serialize_all = "snake_case")]
pub enum MetricVariant { ... }

This would eliminate the manual Display implementation and ensure consistency.

4. Inconsistent Error Handling

Location: engine/packages/namespace/src/workflows/metrics_exporter.rs:156

The Kafka send error handling discards the message:

.map_err(|(err, _)| anyhow::Error::from(err))

Issue: When a Kafka send fails, the message is lost without any retry mechanism or dead letter queue.

Recommendation: Consider:

  • Implementing retry logic with exponential backoff
  • Logging failed metrics for manual recovery
  • Adding a circuit breaker pattern if Kafka is consistently unavailable
  • Or at minimum, add a warning log when metrics are dropped

5. Transaction Timeout Logic

Location: engine/packages/namespace/src/workflows/metrics_exporter.rs:82-84

if start.elapsed() > EARLY_TXN_TIMEOUT {
    tracing::warn\!("timed out processing pending actors metrics");
    break;
}

Issue: When the timeout occurs, partial metrics are exported and the continuation logic via last_key may skip metrics that weren't processed in time.

Recommendation: Add more context to the warning log (how many metrics were processed, what the last key was) and consider if this timeout is appropriate or if chunking should be handled differently.


Performance Considerations

6. String Allocations in Hot Path

Location: engine/packages/namespace/src/keys/metric.rs:46-75

The attributes() method creates new HashMap<String, String> for every metric with cloned strings. This happens for every exported metric.

Recommendation: Consider:

  • Using &'static str for keys and Cow<str> for values where possible
  • Pre-allocating HashMap with known capacity
  • Or caching attribute maps if actor names are limited

7. Unbounded Metrics Collection

Location: engine/packages/namespace/src/workflows/metrics_exporter.rs:48-112

The outer loop collects all metrics into a single Vec before processing. For namespaces with many actors, this could consume significant memory.

Current behavior: Chunks DB reads via timeout, but accumulates all results in memory.

Recommendation: Consider streaming metrics directly to Kafka instead of accumulating them all in memory first. Process and send each chunk before fetching the next.

8. Buffer Size Configuration

Location: engine/packages/namespace/src/workflows/metrics_exporter.rs:159

.buffer_unordered(1024)

The buffer size of 1024 concurrent Kafka sends matches the chunk size, which could create significant memory pressure and network load spikes.

Recommendation: Consider making this configurable or using a smaller buffer (e.g., 32-64) for more controlled resource usage.


Best Practices & Style

9. Magic Numbers

Several magic numbers lack explanation:

  • EXPORT_INTERVAL_MS: 1 minute (line 12) - Good with constant
  • EARLY_TXN_TIMEOUT: 2500ms (line 13) - Good with constant
  • 1024: Used for chunking (lines 139, 159) - Should be a named constant
  • jitter calculation: EXPORT_INTERVAL_MS / 10 (line 29) - Should explain why 10%

Recommendation: Extract to named constants with documentation.

10. Logging Standards

Location: engine/packages/namespace/src/workflows/metrics_exporter.rs:83

tracing::warn\!("timed out processing pending actors metrics");

According to CLAUDE.md, log messages should use structured logging parameters, not formatted strings.

Recommendation:

tracing::warn\!(
    elapsed_ms = ?start.elapsed().as_millis(),
    "timed out processing pending actors metrics"
);

11. Comment Quality

Location: engine/packages/namespace/src/workflows/metrics_exporter.rs:118

// Chunk metrics into 1024 rows

Per CLAUDE.md guidelines, comments should be complete sentences.

Recommendation: "Chunk metrics into batches of 1024 rows for Kafka transmission."


Testing Gaps

12. Missing Test Coverage

No tests are included for:

  • Metrics serialization format (UsageMetricsRow::serialize_compact)
  • Pagination logic with timeout handling
  • Kafka connection failure scenarios
  • Metric attribute generation

Recommendation: Add unit tests for:

  • UsageMetricsRow::serialize_compact() output format validation
  • Metric::attributes() correctness for all variants
  • Error handling paths

Documentation

13. Missing Documentation

The metrics exporter workflow and activity lack doc comments explaining:

  • What triggers the workflow
  • Expected Kafka topic schema
  • Retry/failure behavior
  • Resource usage characteristics

Recommendation: Add module-level and function-level doc comments, especially for public APIs like Metric::variant() and Metric::attributes().


Architecture & Design

14. Kafka Producer Lifecycle

Location: engine/packages/pools/src/pools.rs:87

The Kafka producer is created once at pool initialization but may fail silently if Kafka is unavailable at startup.

Recommendation: Consider health checks and reconnection logic, or document the fail-fast behavior.

15. Workflow Initialization Timing

Location: engine/packages/pegboard-gateway/src/lib.rs:167, 297

The metrics exporter workflow is initialized on every HTTP and WebSocket request via tokio::try_join\!.

Concerns:

  • Is this initialization idempotent? (appears to be via .unique())
  • Could this add latency to request handling?
  • Should this be initialized elsewhere (on namespace creation)?

Recommendation: Consider moving workflow initialization to namespace creation time or document why per-request initialization is preferred.


Minor Issues

16. Workspace Dependency Formatting

Location: Cargo.toml:138-140

    [workspace.dependencies.rdkafka]
    version = "0.38.0"
    features = [ "ssl" ]

Issue: Inconsistent indentation (4 spaces vs 2 spaces used elsewhere in the file).

Recommendation: Use 2-space indentation to match the file's style.

17. Unnecessary Dependency

Location: Cargo.lock:2533

  dependencies = [
   "cc",
+  "libc",
   "pkg-config",

Verify if this libc dependency is necessary or if it's being pulled in transitively.


Summary

Severity Breakdown:

  • 🔴 Critical (1): Hardcoded TODO value
  • 🟡 High (5): Error handling, security, performance issues
  • 🟢 Medium (11): Code quality, testing, documentation

Recommendation: Address the critical TODO issue and error handling concerns before merging. Consider the performance implications for high-traffic namespaces.

Positive Notes:

  • ✅ Good use of jitter to prevent thundering herd
  • ✅ Proper structured logging with tracing
  • ✅ Follows workspace dependency patterns
  • ✅ Transaction batching to avoid long-running transactions
  • ✅ Proper use of workflow uniqueness constraints

@MasterPtato MasterPtato changed the base branch from 01-22-chore_clean_up_pools_remove_unused_crates to graphite-base/4014 January 26, 2026 21:23
@MasterPtato MasterPtato force-pushed the 01-22-do_not_merge_add_metrics_exporter_for_ee branch from b30c70d to bd52f31 Compare January 26, 2026 21:23
@MasterPtato MasterPtato changed the base branch from graphite-base/4014 to 01-23-fix_move_actor_metrics_to_separate_workflow January 26, 2026 21:23
@MasterPtato MasterPtato force-pushed the 01-23-fix_move_actor_metrics_to_separate_workflow branch from 91e86f9 to 58af8e5 Compare January 27, 2026 03:16
@graphite-app graphite-app bot changed the base branch from 01-23-fix_move_actor_metrics_to_separate_workflow to graphite-base/4014 January 27, 2026 03:19
@MasterPtato MasterPtato force-pushed the 01-22-do_not_merge_add_metrics_exporter_for_ee branch from bd52f31 to 22c21f0 Compare January 27, 2026 19:05
@MasterPtato MasterPtato changed the base branch from graphite-base/4014 to main January 27, 2026 19:05
@MasterPtato MasterPtato marked this pull request as ready for review January 27, 2026 23:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants