From f1254012eaf7dcc27be5063afb0bb3aa1efe0c14 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Fri, 26 Sep 2025 14:25:38 -0700 Subject: [PATCH 1/5] Define metrics with Prometheus integration, also updated cursorrules Signed-off-by: Tim Li --- .cursorrules | 41 +++ cadence/_internal/visibility/__init__.py | 11 + cadence/_internal/visibility/metrics.py | 144 +++++++++ cadence/_internal/visibility/prometheus.py | 295 ++++++++++++++++++ pyproject.toml | 1 + .../_internal/visibility/test_metrics.py | 129 ++++++++ .../_internal/visibility/test_prometheus.py | 281 +++++++++++++++++ uv.lock | 11 + 8 files changed, 913 insertions(+) create mode 100644 .cursorrules create mode 100644 cadence/_internal/visibility/__init__.py create mode 100644 cadence/_internal/visibility/metrics.py create mode 100644 cadence/_internal/visibility/prometheus.py create mode 100644 tests/cadence/_internal/visibility/test_metrics.py create mode 100644 tests/cadence/_internal/visibility/test_prometheus.py diff --git a/.cursorrules b/.cursorrules new file mode 100644 index 0000000..a4e375b --- /dev/null +++ b/.cursorrules @@ -0,0 +1,41 @@ +# Cursor Rules for Cadence Python Client + +## Package Management +- **Always use `uv` for Python package management** +- Use `uv run` for running Python commands instead of `python` directly +- Use `uv sync` for installing dependencies instead of `pip install` +- Use `uv tool run` for running development tools (pytest, mypy, ruff, etc.) +- Only use `pip` or `python` directly when specifically required by the tool or documentation + +## Examples +```bash +# ✅ Correct +uv run python scripts/generate_proto.py +uv run python -m pytest tests/ +uv tool run mypy cadence/ +uv tool run ruff check + +# ❌ Avoid +python scripts/generate_proto.py +pip install -e ".[dev]" +``` + +## Virtual Environment +- The project uses `uv` for virtual environment management +- Always activate the virtual environment using `uv` commands +- Dependencies are managed through `pyproject.toml` and `uv.lock` + +## Testing +- Run tests with `uv run python -m pytest` +- Use `uv run` for any Python script execution +- Development tools should be run with `uv tool run` + +## Code Generation +- Use `uv run python scripts/generate_proto.py` for protobuf generation +- Use `uv run python scripts/dev.py` for development tasks + +## Code Quality +- Run linter with auto-fix: `uv tool run ruff check --fix` +- Run type checking: `uv tool run mypy cadence/` +- Always run both commands before committing code changes +- Use `uv tool run ruff check --fix && uv tool run mypy cadence/` to run both together diff --git a/cadence/_internal/visibility/__init__.py b/cadence/_internal/visibility/__init__.py new file mode 100644 index 0000000..70fe020 --- /dev/null +++ b/cadence/_internal/visibility/__init__.py @@ -0,0 +1,11 @@ +"""Visibility and metrics collection components for Cadence client.""" + +from .metrics import MetricsRegistry, get_default_registry +from .prometheus import PrometheusMetrics, PrometheusConfig + +__all__ = [ + "MetricsRegistry", + "get_default_registry", + "PrometheusMetrics", + "PrometheusConfig", +] diff --git a/cadence/_internal/visibility/metrics.py b/cadence/_internal/visibility/metrics.py new file mode 100644 index 0000000..5ecefb1 --- /dev/null +++ b/cadence/_internal/visibility/metrics.py @@ -0,0 +1,144 @@ +"""Core metrics collection interface and registry for Cadence client.""" + +import logging +from enum import Enum +from typing import Dict, Optional, Protocol, Set + +logger = logging.getLogger(__name__) + + +class MetricType(Enum): + """Types of metrics that can be collected.""" + + COUNTER = "counter" + GAUGE = "gauge" + HISTOGRAM = "histogram" + SUMMARY = "summary" + + +class MetricsCollector(Protocol): + """Protocol for metrics collection backends.""" + + def counter( + self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a counter metric.""" + ... + + def gauge( + self, key: str, value: float, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a gauge metric.""" + ... + + def timer( + self, key: str, duration: float, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a timer metric.""" + ... + + def histogram( + self, key: str, value: float, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a histogram metric.""" + ... + + +class NoOpMetricsCollector: + """No-op metrics collector that discards all metrics.""" + + def counter( + self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None + ) -> None: + pass + + def gauge( + self, key: str, value: float, tags: Optional[Dict[str, str]] = None + ) -> None: + pass + + def timer( + self, key: str, duration: float, tags: Optional[Dict[str, str]] = None + ) -> None: + pass + + def histogram( + self, key: str, value: float, tags: Optional[Dict[str, str]] = None + ) -> None: + pass + + +class MetricsRegistry: + """Registry for managing metrics collection in the Cadence client.""" + + def __init__(self, collector: Optional[MetricsCollector] = None): + self._collector = collector or NoOpMetricsCollector() + self._registered_metrics: Set[str] = set() + + def set_collector(self, collector: MetricsCollector) -> None: + """Set the metrics collector backend.""" + self._collector = collector + logger.info(f"Metrics collector set to {type(collector).__name__}") + + def register_metric(self, name: str, metric_type: MetricType) -> None: + """Register a metric with the registry.""" + if name in self._registered_metrics: + logger.warning(f"Metric {name} already registered") + return + + self._registered_metrics.add(name) + logger.debug(f"Registered {metric_type.value} metric: {name}") + + def counter( + self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a counter metric.""" + try: + self._collector.counter(key, n, tags) + except Exception as e: + logger.error(f"Failed to send counter {key}: {e}") + + def gauge( + self, key: str, value: float, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a gauge metric.""" + try: + self._collector.gauge(key, value, tags) + except Exception as e: + logger.error(f"Failed to send gauge {key}: {e}") + + def timer( + self, key: str, duration: float, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a timer metric.""" + try: + self._collector.timer(key, duration, tags) + except Exception as e: + logger.error(f"Failed to send timer {key}: {e}") + + def histogram( + self, key: str, value: float, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a histogram metric.""" + try: + self._collector.histogram(key, value, tags) + except Exception as e: + logger.error(f"Failed to send histogram {key}: {e}") + + +# Global default registry +_default_registry: Optional[MetricsRegistry] = None + + +def get_default_registry() -> MetricsRegistry: + """Get the default global metrics registry.""" + global _default_registry + if _default_registry is None: + _default_registry = MetricsRegistry() + return _default_registry + + +def set_default_registry(registry: MetricsRegistry) -> None: + """Set the default global metrics registry.""" + global _default_registry + _default_registry = registry diff --git a/cadence/_internal/visibility/prometheus.py b/cadence/_internal/visibility/prometheus.py new file mode 100644 index 0000000..98b8500 --- /dev/null +++ b/cadence/_internal/visibility/prometheus.py @@ -0,0 +1,295 @@ +"""Prometheus metrics integration for Cadence client.""" + +import logging +from dataclasses import dataclass, field +from typing import Dict, Optional, Any + +from prometheus_client import ( # type: ignore[import-not-found] + REGISTRY, + CollectorRegistry, + Counter, + Gauge, + Histogram, + Summary, + generate_latest, + push_to_gateway, + start_http_server, +) + + +logger = logging.getLogger(__name__) + + +@dataclass +class PrometheusConfig: + """Configuration for Prometheus metrics.""" + + # HTTP server configuration + enable_http_server: bool = False + http_port: int = 8000 + http_addr: str = "0.0.0.0" + + # Push gateway configuration + enable_push_gateway: bool = False + push_gateway_url: str = "localhost:9091" + push_job_name: str = "cadence_client" + + # Metric name prefix + metric_prefix: str = "cadence_" + + # Default labels to apply to all metrics + default_labels: Dict[str, str] = field(default_factory=dict) + + # Custom registry (if None, uses default global registry) + registry: Optional[CollectorRegistry] = None + + +class PrometheusMetrics: + """Prometheus metrics collector implementation.""" + + def __init__(self, config: Optional[PrometheusConfig] = None): + self.config = config or PrometheusConfig() + self.registry = self.config.registry or REGISTRY + + # Track created metrics to avoid duplicates + self._counters: Dict[str, Counter] = {} + self._gauges: Dict[str, Gauge] = {} + self._histograms: Dict[str, Histogram] = {} + self._summaries: Dict[str, Summary] = {} + + # HTTP server handle + self._http_server: Optional[Any] = None + + if self.config.enable_http_server: + self.start_http_server() + + def _get_metric_name(self, name: str) -> str: + """Get the full metric name with prefix.""" + return f"{self.config.metric_prefix}{name}" + + def _merge_labels(self, labels: Optional[Dict[str, str]]) -> Dict[str, str]: + """Merge provided labels with default labels.""" + merged = self.config.default_labels.copy() + if labels: + merged.update(labels) + return merged + + def _get_or_create_counter( + self, name: str, labels: Optional[Dict[str, str]] + ) -> Counter: + """Get or create a Counter metric.""" + metric_name = self._get_metric_name(name) + + if metric_name not in self._counters: + label_names = list(self._merge_labels(labels).keys()) if labels else [] + self._counters[metric_name] = Counter( + metric_name, + f"Counter metric for {name}", + labelnames=label_names, + registry=self.registry, + ) + logger.debug(f"Created counter metric: {metric_name}") + + return self._counters[metric_name] + + def _get_or_create_gauge( + self, name: str, labels: Optional[Dict[str, str]] + ) -> Gauge: + """Get or create a Gauge metric.""" + metric_name = self._get_metric_name(name) + + if metric_name not in self._gauges: + label_names = list(self._merge_labels(labels).keys()) if labels else [] + self._gauges[metric_name] = Gauge( + metric_name, + f"Gauge metric for {name}", + labelnames=label_names, + registry=self.registry, + ) + logger.debug(f"Created gauge metric: {metric_name}") + + return self._gauges[metric_name] + + def _get_or_create_histogram( + self, name: str, labels: Optional[Dict[str, str]] + ) -> Histogram: + """Get or create a Histogram metric.""" + metric_name = self._get_metric_name(name) + + if metric_name not in self._histograms: + label_names = list(self._merge_labels(labels).keys()) if labels else [] + self._histograms[metric_name] = Histogram( + metric_name, + f"Histogram metric for {name}", + labelnames=label_names, + registry=self.registry, + ) + logger.debug(f"Created histogram metric: {metric_name}") + + return self._histograms[metric_name] + + def _get_or_create_summary( + self, name: str, labels: Optional[Dict[str, str]] + ) -> Summary: + """Get or create a Summary metric.""" + metric_name = self._get_metric_name(name) + + if metric_name not in self._summaries: + label_names = list(self._merge_labels(labels).keys()) if labels else [] + self._summaries[metric_name] = Summary( + metric_name, + f"Summary metric for {name}", + labelnames=label_names, + registry=self.registry, + ) + logger.debug(f"Created summary metric: {metric_name}") + + return self._summaries[metric_name] + + def count( + self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a count metric (aligned with M3.count).""" + try: + counter = self._get_or_create_counter(key, tags) + merged_tags = self._merge_labels(tags) + + if merged_tags: + counter.labels(**merged_tags).inc(n) + else: + counter.inc(n) + + except Exception as e: + logger.error(f"Failed to send count {key}: {e}") + + def gauge( + self, key: str, value: float, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a gauge metric (aligned with M3.gauge).""" + try: + gauge = self._get_or_create_gauge(key, tags) + merged_tags = self._merge_labels(tags) + + if merged_tags: + gauge.labels(**merged_tags).set(value) + else: + gauge.set(value) + + except Exception as e: + logger.error(f"Failed to send gauge {key}: {e}") + + def timing( + self, key: str, duration: float, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a timing metric (aligned with M3.timing) - implemented as histogram.""" + try: + histogram = self._get_or_create_histogram(key, tags) + merged_tags = self._merge_labels(tags) + + if merged_tags: + histogram.labels(**merged_tags).observe(duration) + else: + histogram.observe(duration) + + except Exception as e: + logger.error(f"Failed to send timing {key}: {e}") + + def histogram( + self, key: str, value: float, tags: Optional[Dict[str, str]] = None + ) -> None: + """Send a histogram metric (aligned with M3.histogram).""" + try: + histogram = self._get_or_create_histogram(key, tags) + merged_tags = self._merge_labels(tags) + + if merged_tags: + histogram.labels(**merged_tags).observe(value) + else: + histogram.observe(value) + + except Exception as e: + logger.error(f"Failed to send histogram {key}: {e}") + + def start_http_server(self) -> None: + """Start HTTP server to expose metrics.""" + if self._http_server is not None: + logger.warning("HTTP server already started") + return + + try: + server_result = start_http_server( + self.config.http_port, + addr=self.config.http_addr, + registry=self.registry, + ) + self._http_server = server_result + logger.info( + f"Prometheus metrics HTTP server started on " + f"{self.config.http_addr}:{self.config.http_port}" + ) + except Exception as e: + logger.error(f"Failed to start HTTP server: {e}") + raise + + def push_to_gateway(self) -> None: + """Push metrics to Prometheus Push Gateway.""" + if not self.config.enable_push_gateway: + logger.warning("Push gateway not enabled") + return + + try: + push_to_gateway( + self.config.push_gateway_url, + job=self.config.push_job_name, + registry=self.registry, + ) + logger.debug(f"Pushed metrics to gateway: {self.config.push_gateway_url}") + except Exception as e: + logger.error(f"Failed to push to gateway: {e}") + raise + + def get_metrics_text(self) -> str: + """Get metrics in Prometheus text format.""" + try: + metrics_bytes = generate_latest(self.registry) + return metrics_bytes.decode("utf-8") # type: ignore[no-any-return] + except Exception as e: + logger.error(f"Failed to generate metrics text: {e}") + return "" + + def shutdown(self) -> None: + """Shutdown the metrics collector.""" + if self._http_server: + try: + self._http_server.shutdown() + self._http_server = None + logger.info("Prometheus HTTP server shutdown") + except Exception as e: + logger.error(f"Failed to shutdown HTTP server: {e}") + + +# Default Cadence metrics names +class CadenceMetrics: + """Standard Cadence client metrics.""" + + # Workflow metrics + WORKFLOW_STARTED_TOTAL = "workflow_started_total" + WORKFLOW_COMPLETED_TOTAL = "workflow_completed_total" + WORKFLOW_FAILED_TOTAL = "workflow_failed_total" + WORKFLOW_DURATION_SECONDS = "workflow_duration_seconds" + + # Activity metrics + ACTIVITY_STARTED_TOTAL = "activity_started_total" + ACTIVITY_COMPLETED_TOTAL = "activity_completed_total" + ACTIVITY_FAILED_TOTAL = "activity_failed_total" + ACTIVITY_DURATION_SECONDS = "activity_duration_seconds" + + # Worker metrics + WORKER_TASK_POLLS_TOTAL = "worker_task_polls_total" + WORKER_TASK_POLL_ERRORS_TOTAL = "worker_task_poll_errors_total" + WORKER_ACTIVE_TASKS = "worker_active_tasks" + + # Client metrics + CLIENT_REQUESTS_TOTAL = "client_requests_total" + CLIENT_REQUEST_DURATION_SECONDS = "client_request_duration_seconds" + CLIENT_REQUEST_ERRORS_TOTAL = "client_request_errors_total" diff --git a/pyproject.toml b/pyproject.toml index b860554..2dabe83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "msgspec>=0.19.0", "protobuf==5.29.1", "typing-extensions>=4.0.0", + "prometheus-client>=0.21.0", ] [project.optional-dependencies] diff --git a/tests/cadence/_internal/visibility/test_metrics.py b/tests/cadence/_internal/visibility/test_metrics.py new file mode 100644 index 0000000..9183e39 --- /dev/null +++ b/tests/cadence/_internal/visibility/test_metrics.py @@ -0,0 +1,129 @@ +"""Tests for metrics collection functionality.""" + +from unittest.mock import Mock + + +from cadence._internal.visibility.metrics import ( + MetricsRegistry, + MetricType, + NoOpMetricsCollector, + get_default_registry, + set_default_registry, +) + + +class TestMetricsRegistry: + """Test cases for MetricsRegistry.""" + + def test_registry_with_no_collector(self): + """Test registry with default no-op collector.""" + registry = MetricsRegistry() + + # Should not raise any exceptions + registry.counter("test_counter", 1) + registry.gauge("test_gauge", 42.0) + registry.histogram("test_histogram", 0.5) + registry.timer("test_timing", 1.5) + + def test_registry_with_mock_collector(self): + """Test registry with mock collector.""" + mock_collector = Mock() + registry = MetricsRegistry(mock_collector) + + # Test counter + registry.counter("test_counter", 2, {"label": "value"}) + mock_collector.counter.assert_called_once_with( + "test_counter", 2, {"label": "value"} + ) + + # Test gauge + registry.gauge("test_gauge", 100.0, {"env": "test"}) + mock_collector.gauge.assert_called_once_with( + "test_gauge", 100.0, {"env": "test"} + ) + + # Test timer + registry.timer("test_timing", 0.75) + mock_collector.timer.assert_called_once_with( + "test_timing", 0.75, None + ) + + # Test histogram + registry.histogram("test_histogram", 2.5) + mock_collector.histogram.assert_called_once_with( + "test_histogram", 2.5, None + ) + + def test_set_collector(self): + """Test setting a new collector.""" + registry = MetricsRegistry() + mock_collector = Mock() + + registry.set_collector(mock_collector) + registry.counter("test", 1) + + mock_collector.counter.assert_called_once_with("test", 1, None) + + def test_register_metric(self): + """Test metric registration.""" + registry = MetricsRegistry() + + registry.register_metric("test_counter", MetricType.COUNTER) + registry.register_metric("test_gauge", MetricType.GAUGE) + + # Registering the same metric twice should not raise an error + registry.register_metric("test_counter", MetricType.COUNTER) + + def test_collector_exception_handling(self): + """Test that collector exceptions are handled gracefully.""" + mock_collector = Mock() + mock_collector.counter.side_effect = Exception("Test exception") + + registry = MetricsRegistry(mock_collector) + + # Should not raise exception, but log error + registry.counter("test", 1) + + mock_collector.counter.assert_called_once() + + +class TestNoOpMetricsCollector: + """Test cases for NoOpMetricsCollector.""" + + def test_no_op_collector(self): + """Test that no-op collector doesn't raise exceptions.""" + collector = NoOpMetricsCollector() + + # Should not raise any exceptions + collector.counter("test", 1, {"label": "value"}) + collector.gauge("test", 42.0) + collector.histogram("test", 0.5, {"env": "test"}) + collector.timer("test", 1.5) + + +class TestDefaultRegistry: + """Test cases for default registry management.""" + + def test_get_default_registry(self): + """Test getting the default registry.""" + registry = get_default_registry() + assert isinstance(registry, MetricsRegistry) + + # Should return the same instance + registry2 = get_default_registry() + assert registry is registry2 + + def test_set_default_registry(self): + """Test setting a custom default registry.""" + original_registry = get_default_registry() + custom_registry = MetricsRegistry() + + set_default_registry(custom_registry) + + # Should return the custom registry + current_registry = get_default_registry() + assert current_registry is custom_registry + assert current_registry is not original_registry + + # Restore original for other tests + set_default_registry(original_registry) diff --git a/tests/cadence/_internal/visibility/test_prometheus.py b/tests/cadence/_internal/visibility/test_prometheus.py new file mode 100644 index 0000000..08415e9 --- /dev/null +++ b/tests/cadence/_internal/visibility/test_prometheus.py @@ -0,0 +1,281 @@ +"""Tests for Prometheus metrics integration.""" + +from unittest.mock import Mock, patch + + +from cadence._internal.visibility.prometheus import ( + CadenceMetrics, + PrometheusConfig, + PrometheusMetrics, +) + + +class TestPrometheusConfig: + """Test cases for PrometheusConfig.""" + + def test_default_config(self): + """Test default configuration values.""" + config = PrometheusConfig() + + assert config.enable_http_server is False + assert config.http_port == 8000 + assert config.http_addr == "0.0.0.0" + assert config.enable_push_gateway is False + assert config.push_gateway_url == "localhost:9091" + assert config.push_job_name == "cadence_client" + assert config.metric_prefix == "cadence_" + assert config.default_labels == {} + assert config.registry is None + + def test_custom_config(self): + """Test custom configuration values.""" + config = PrometheusConfig( + enable_http_server=True, + http_port=9000, + metric_prefix="my_cadence_", + default_labels={"env": "test"}, + ) + + assert config.enable_http_server is True + assert config.http_port == 9000 + assert config.metric_prefix == "my_cadence_" + assert config.default_labels == {"env": "test"} + + +class TestPrometheusMetrics: + """Test cases for PrometheusMetrics.""" + + @patch("cadence._internal.visibility.prometheus.start_http_server") + def test_initialization_with_http_server(self, mock_start_server): + """Test PrometheusMetrics initialization with HTTP server.""" + config = PrometheusConfig(enable_http_server=True, http_port=8001) + + metrics = PrometheusMetrics(config) + + assert metrics.config == config + mock_start_server.assert_called_once_with( + 8001, addr="0.0.0.0", registry=metrics.registry + ) + + def test_initialization_without_http_server(self): + """Test PrometheusMetrics initialization without HTTP server.""" + config = PrometheusConfig(enable_http_server=False) + + metrics = PrometheusMetrics(config) + + assert metrics.config == config + assert metrics._http_server is None + + def test_metric_name_with_prefix(self): + """Test metric name generation with prefix.""" + config = PrometheusConfig(metric_prefix="test_") + metrics = PrometheusMetrics(config) + + name = metrics._get_metric_name("counter") + assert name == "test_counter" + + def test_merge_labels(self): + """Test label merging with default labels.""" + config = PrometheusConfig(default_labels={"env": "test", "service": "cadence"}) + metrics = PrometheusMetrics(config) + + # Test with no additional labels + merged = metrics._merge_labels(None) + assert merged == {"env": "test", "service": "cadence"} + + # Test with additional labels + merged = metrics._merge_labels({"operation": "poll"}) + assert merged == {"env": "test", "service": "cadence", "operation": "poll"} + + # Test label override + merged = metrics._merge_labels({"env": "prod", "operation": "poll"}) + assert merged == {"env": "prod", "service": "cadence", "operation": "poll"} + + @patch("cadence._internal.visibility.prometheus.Counter") + def test_count(self, mock_counter_class): + """Test count metric (M3-aligned).""" + mock_counter = Mock() + mock_labeled_counter = Mock() + mock_counter.labels.return_value = mock_labeled_counter + mock_counter_class.return_value = mock_counter + + config = PrometheusConfig() + metrics = PrometheusMetrics(config) + + # Test without labels + metrics.count("test_counter", 2) + mock_counter.inc.assert_called_once_with(2) + + # Reset mock + mock_counter.reset_mock() + mock_labeled_counter.reset_mock() + + # Test with labels + metrics.count("test_counter", 1, {"env": "test"}) + mock_counter.labels.assert_called_once_with(env="test") + mock_labeled_counter.inc.assert_called_once_with(1) + + @patch("cadence._internal.visibility.prometheus.Gauge") + def test_gauge(self, mock_gauge_class): + """Test gauge metric (M3-aligned).""" + mock_gauge = Mock() + mock_labeled_gauge = Mock() + mock_gauge.labels.return_value = mock_labeled_gauge + mock_gauge_class.return_value = mock_gauge + + config = PrometheusConfig() + metrics = PrometheusMetrics(config) + + # Test without labels + metrics.gauge("test_gauge", 42.0) + mock_gauge.set.assert_called_once_with(42.0) + + # Reset mock + mock_gauge.reset_mock() + mock_labeled_gauge.reset_mock() + + # Test with labels + metrics.gauge("test_gauge", 100.0, {"env": "test"}) + mock_gauge.labels.assert_called_once_with(env="test") + mock_labeled_gauge.set.assert_called_once_with(100.0) + + @patch("cadence._internal.visibility.prometheus.Histogram") + def test_timing(self, mock_histogram_class): + """Test timing metric (M3-aligned).""" + mock_histogram = Mock() + mock_labeled_histogram = Mock() + mock_histogram.labels.return_value = mock_labeled_histogram + mock_histogram_class.return_value = mock_histogram + + config = PrometheusConfig() + metrics = PrometheusMetrics(config) + + # Test without labels + metrics.timing("test_timing", 0.5) + mock_histogram.observe.assert_called_once_with(0.5) + + # Reset mock + mock_histogram.reset_mock() + mock_labeled_histogram.reset_mock() + + # Test with labels + metrics.timing("test_timing", 1.0, {"env": "test"}) + mock_histogram.labels.assert_called_once_with(env="test") + mock_labeled_histogram.observe.assert_called_once_with(1.0) + + @patch("cadence._internal.visibility.prometheus.Histogram") + def test_histogram(self, mock_histogram_class): + """Test histogram metric (M3-aligned).""" + mock_histogram = Mock() + mock_labeled_histogram = Mock() + mock_histogram.labels.return_value = mock_labeled_histogram + mock_histogram_class.return_value = mock_histogram + + config = PrometheusConfig() + metrics = PrometheusMetrics(config) + + # Test without labels + metrics.histogram("test_histogram", 2.5) + mock_histogram.observe.assert_called_once_with(2.5) + + # Reset mock + mock_histogram.reset_mock() + mock_labeled_histogram.reset_mock() + + # Test with labels + metrics.histogram("test_histogram", 3.0, {"env": "test"}) + mock_histogram.labels.assert_called_once_with(env="test") + mock_labeled_histogram.observe.assert_called_once_with(3.0) + + @patch("cadence._internal.visibility.prometheus.push_to_gateway") + def test_push_to_gateway(self, mock_push): + """Test pushing metrics to gateway.""" + config = PrometheusConfig( + enable_push_gateway=True, + push_gateway_url="localhost:9091", + push_job_name="test_job", + ) + metrics = PrometheusMetrics(config) + + metrics.push_to_gateway() + + mock_push.assert_called_once_with( + "localhost:9091", job="test_job", registry=metrics.registry + ) + + def test_push_to_gateway_disabled(self): + """Test push to gateway when disabled.""" + config = PrometheusConfig(enable_push_gateway=False) + metrics = PrometheusMetrics(config) + + # Should not raise exception + metrics.push_to_gateway() + + @patch("cadence._internal.visibility.prometheus.generate_latest") + def test_get_metrics_text(self, mock_generate): + """Test getting metrics as text.""" + mock_generate.return_value = ( + b"# HELP test_counter Test counter\ntest_counter 1.0\n" + ) + + config = PrometheusConfig() + metrics = PrometheusMetrics(config) + + text = metrics.get_metrics_text() + + assert text == "# HELP test_counter Test counter\ntest_counter 1.0\n" + mock_generate.assert_called_once_with(metrics.registry) + + def test_shutdown(self): + """Test metrics shutdown.""" + config = PrometheusConfig(enable_http_server=False) + metrics = PrometheusMetrics(config) + + # Mock HTTP server + mock_server = Mock() + metrics._http_server = mock_server # type: ignore + + metrics.shutdown() + + mock_server.shutdown.assert_called_once() + assert metrics._http_server is None + + +class TestCadenceMetrics: + """Test cases for CadenceMetrics constants.""" + + def test_workflow_metrics(self): + """Test workflow metric names.""" + assert CadenceMetrics.WORKFLOW_STARTED_TOTAL == "workflow_started_total" + assert CadenceMetrics.WORKFLOW_COMPLETED_TOTAL == "workflow_completed_total" + assert CadenceMetrics.WORKFLOW_FAILED_TOTAL == "workflow_failed_total" + assert CadenceMetrics.WORKFLOW_DURATION_SECONDS == "workflow_duration_seconds" + + def test_activity_metrics(self): + """Test activity metric names.""" + assert CadenceMetrics.ACTIVITY_STARTED_TOTAL == "activity_started_total" + assert CadenceMetrics.ACTIVITY_COMPLETED_TOTAL == "activity_completed_total" + assert CadenceMetrics.ACTIVITY_FAILED_TOTAL == "activity_failed_total" + assert CadenceMetrics.ACTIVITY_DURATION_SECONDS == "activity_duration_seconds" + + def test_worker_metrics(self): + """Test worker metric names.""" + assert CadenceMetrics.WORKER_TASK_POLLS_TOTAL == "worker_task_polls_total" + assert ( + CadenceMetrics.WORKER_TASK_POLL_ERRORS_TOTAL + == "worker_task_poll_errors_total" + ) + assert CadenceMetrics.WORKER_ACTIVE_TASKS == "worker_active_tasks" + + def test_client_metrics(self): + """Test client metric names.""" + assert CadenceMetrics.CLIENT_REQUESTS_TOTAL == "client_requests_total" + assert ( + CadenceMetrics.CLIENT_REQUEST_DURATION_SECONDS + == "client_request_duration_seconds" + ) + assert ( + CadenceMetrics.CLIENT_REQUEST_ERRORS_TOTAL == "client_request_errors_total" + ) + + diff --git a/uv.lock b/uv.lock index ca5750c..117a1ef 100644 --- a/uv.lock +++ b/uv.lock @@ -155,6 +155,7 @@ dependencies = [ { name = "grpcio" }, { name = "grpcio-status" }, { name = "msgspec" }, + { name = "prometheus-client" }, { name = "protobuf" }, { name = "typing-extensions" }, ] @@ -195,6 +196,7 @@ requires-dist = [ { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.0.0" }, { name = "myst-parser", marker = "extra == 'docs'", specifier = ">=1.0.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=3.0.0" }, + { name = "prometheus-client", specifier = ">=0.21.0" }, { name = "protobuf", specifier = "==5.29.1" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.4.1" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.21.0" }, @@ -953,6 +955,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/88/74/a88bf1b1efeae488a0c0b7bdf71429c313722d1fc0f377537fbe554e6180/pre_commit-4.2.0-py2.py3-none-any.whl", hash = "sha256:a009ca7205f1eb497d10b845e52c838a98b6cdd2102a6c8e4540e94ee75c58bd", size = 220707, upload-time = "2025-03-18T21:35:19.343Z" }, ] +[[package]] +name = "prometheus-client" +version = "0.23.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/23/53/3edb5d68ecf6b38fcbcc1ad28391117d2a322d9a1a3eff04bfdb184d8c3b/prometheus_client-0.23.1.tar.gz", hash = "sha256:6ae8f9081eaaaf153a2e959d2e6c4f4fb57b12ef76c8c7980202f1e57b48b2ce", size = 80481, upload-time = "2025-09-18T20:47:25.043Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b8/db/14bafcb4af2139e046d03fd00dea7873e48eafe18b7d2797e73d6681f210/prometheus_client-0.23.1-py3-none-any.whl", hash = "sha256:dd1913e6e76b59cfe44e7a4b83e01afc9873c1bdfd2ed8739f1e76aeca115f99", size = 61145, upload-time = "2025-09-18T20:47:23.875Z" }, +] + [[package]] name = "propcache" version = "0.3.2" From 43aac5dd9ca749d9d5cf78a6e343b56be679248f Mon Sep 17 00:00:00 2001 From: Tim Li Date: Mon, 29 Sep 2025 09:19:10 -0700 Subject: [PATCH 2/5] change naming and combine metricsLogic Signed-off-by: Tim Li --- .cursorrules | 10 +- cadence/_internal/visibility/__init__.py | 7 +- cadence/_internal/visibility/metrics.py | 90 +++--------- cadence/_internal/visibility/prometheus.py | 16 +-- .../_internal/visibility/test_metrics.py | 136 ++++++------------ .../_internal/visibility/test_prometheus.py | 20 +-- 6 files changed, 91 insertions(+), 188 deletions(-) diff --git a/.cursorrules b/.cursorrules index a4e375b..bde878e 100644 --- a/.cursorrules +++ b/.cursorrules @@ -35,7 +35,15 @@ pip install -e ".[dev]" - Use `uv run python scripts/dev.py` for development tasks ## Code Quality +- **ALWAYS run linter and type checker after making code changes** - Run linter with auto-fix: `uv tool run ruff check --fix` - Run type checking: `uv tool run mypy cadence/` -- Always run both commands before committing code changes - Use `uv tool run ruff check --fix && uv tool run mypy cadence/` to run both together +- **Standard workflow**: Make changes → Run linter → Run type checker → Commit + +## Development Workflow +1. Make code changes +2. Run `uv tool run ruff check --fix` (fixes formatting and linting issues) +3. Run `uv tool run mypy cadence/` (checks type safety) +4. Run `uv run python -m pytest` (run tests) +5. Commit changes diff --git a/cadence/_internal/visibility/__init__.py b/cadence/_internal/visibility/__init__.py index 70fe020..3966af2 100644 --- a/cadence/_internal/visibility/__init__.py +++ b/cadence/_internal/visibility/__init__.py @@ -1,11 +1,12 @@ """Visibility and metrics collection components for Cadence client.""" -from .metrics import MetricsRegistry, get_default_registry +from .metrics import MetricsHandler, NoOpMetricsHandler, get_default_handler from .prometheus import PrometheusMetrics, PrometheusConfig __all__ = [ - "MetricsRegistry", - "get_default_registry", + "MetricsHandler", + "NoOpMetricsHandler", + "get_default_handler", "PrometheusMetrics", "PrometheusConfig", ] diff --git a/cadence/_internal/visibility/metrics.py b/cadence/_internal/visibility/metrics.py index 5ecefb1..bcf68ee 100644 --- a/cadence/_internal/visibility/metrics.py +++ b/cadence/_internal/visibility/metrics.py @@ -2,7 +2,7 @@ import logging from enum import Enum -from typing import Dict, Optional, Protocol, Set +from typing import Dict, Optional, Protocol logger = logging.getLogger(__name__) @@ -16,7 +16,7 @@ class MetricType(Enum): SUMMARY = "summary" -class MetricsCollector(Protocol): +class MetricsHandler(Protocol): """Protocol for metrics collection backends.""" def counter( @@ -44,8 +44,8 @@ def histogram( ... -class NoOpMetricsCollector: - """No-op metrics collector that discards all metrics.""" +class NoOpMetricsHandler: + """No-op metrics handler that discards all metrics.""" def counter( self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None @@ -68,77 +68,19 @@ def histogram( pass -class MetricsRegistry: - """Registry for managing metrics collection in the Cadence client.""" +# Global default handler +_default_handler: Optional[MetricsHandler] = None - def __init__(self, collector: Optional[MetricsCollector] = None): - self._collector = collector or NoOpMetricsCollector() - self._registered_metrics: Set[str] = set() - def set_collector(self, collector: MetricsCollector) -> None: - """Set the metrics collector backend.""" - self._collector = collector - logger.info(f"Metrics collector set to {type(collector).__name__}") +def get_default_handler() -> MetricsHandler: + """Get the default global metrics handler.""" + global _default_handler + if _default_handler is None: + _default_handler = NoOpMetricsHandler() + return _default_handler - def register_metric(self, name: str, metric_type: MetricType) -> None: - """Register a metric with the registry.""" - if name in self._registered_metrics: - logger.warning(f"Metric {name} already registered") - return - self._registered_metrics.add(name) - logger.debug(f"Registered {metric_type.value} metric: {name}") - - def counter( - self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None - ) -> None: - """Send a counter metric.""" - try: - self._collector.counter(key, n, tags) - except Exception as e: - logger.error(f"Failed to send counter {key}: {e}") - - def gauge( - self, key: str, value: float, tags: Optional[Dict[str, str]] = None - ) -> None: - """Send a gauge metric.""" - try: - self._collector.gauge(key, value, tags) - except Exception as e: - logger.error(f"Failed to send gauge {key}: {e}") - - def timer( - self, key: str, duration: float, tags: Optional[Dict[str, str]] = None - ) -> None: - """Send a timer metric.""" - try: - self._collector.timer(key, duration, tags) - except Exception as e: - logger.error(f"Failed to send timer {key}: {e}") - - def histogram( - self, key: str, value: float, tags: Optional[Dict[str, str]] = None - ) -> None: - """Send a histogram metric.""" - try: - self._collector.histogram(key, value, tags) - except Exception as e: - logger.error(f"Failed to send histogram {key}: {e}") - - -# Global default registry -_default_registry: Optional[MetricsRegistry] = None - - -def get_default_registry() -> MetricsRegistry: - """Get the default global metrics registry.""" - global _default_registry - if _default_registry is None: - _default_registry = MetricsRegistry() - return _default_registry - - -def set_default_registry(registry: MetricsRegistry) -> None: - """Set the default global metrics registry.""" - global _default_registry - _default_registry = registry +def set_default_handler(handler: MetricsHandler) -> None: + """Set the default global metrics handler.""" + global _default_handler + _default_handler = handler diff --git a/cadence/_internal/visibility/prometheus.py b/cadence/_internal/visibility/prometheus.py index 98b8500..79efe64 100644 --- a/cadence/_internal/visibility/prometheus.py +++ b/cadence/_internal/visibility/prometheus.py @@ -146,10 +146,10 @@ def _get_or_create_summary( return self._summaries[metric_name] - def count( + def counter( self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None ) -> None: - """Send a count metric (aligned with M3.count).""" + """Send a counter metric.""" try: counter = self._get_or_create_counter(key, tags) merged_tags = self._merge_labels(tags) @@ -160,12 +160,12 @@ def count( counter.inc(n) except Exception as e: - logger.error(f"Failed to send count {key}: {e}") + logger.error(f"Failed to send counter {key}: {e}") def gauge( self, key: str, value: float, tags: Optional[Dict[str, str]] = None ) -> None: - """Send a gauge metric (aligned with M3.gauge).""" + """Send a gauge metric.""" try: gauge = self._get_or_create_gauge(key, tags) merged_tags = self._merge_labels(tags) @@ -178,10 +178,10 @@ def gauge( except Exception as e: logger.error(f"Failed to send gauge {key}: {e}") - def timing( + def timer( self, key: str, duration: float, tags: Optional[Dict[str, str]] = None ) -> None: - """Send a timing metric (aligned with M3.timing) - implemented as histogram.""" + """Send a timer metric - implemented as histogram.""" try: histogram = self._get_or_create_histogram(key, tags) merged_tags = self._merge_labels(tags) @@ -192,12 +192,12 @@ def timing( histogram.observe(duration) except Exception as e: - logger.error(f"Failed to send timing {key}: {e}") + logger.error(f"Failed to send timer {key}: {e}") def histogram( self, key: str, value: float, tags: Optional[Dict[str, str]] = None ) -> None: - """Send a histogram metric (aligned with M3.histogram).""" + """Send a histogram metric.""" try: histogram = self._get_or_create_histogram(key, tags) merged_tags = self._merge_labels(tags) diff --git a/tests/cadence/_internal/visibility/test_metrics.py b/tests/cadence/_internal/visibility/test_metrics.py index 9183e39..f2a58bc 100644 --- a/tests/cadence/_internal/visibility/test_metrics.py +++ b/tests/cadence/_internal/visibility/test_metrics.py @@ -4,126 +4,78 @@ from cadence._internal.visibility.metrics import ( - MetricsRegistry, - MetricType, - NoOpMetricsCollector, - get_default_registry, - set_default_registry, + MetricsHandler, + NoOpMetricsHandler, + get_default_handler, + set_default_handler, ) -class TestMetricsRegistry: - """Test cases for MetricsRegistry.""" +class TestMetricsHandler: + """Test cases for MetricsHandler protocol.""" - def test_registry_with_no_collector(self): - """Test registry with default no-op collector.""" - registry = MetricsRegistry() + def test_noop_handler(self): + """Test no-op handler doesn't raise exceptions.""" + handler = NoOpMetricsHandler() # Should not raise any exceptions - registry.counter("test_counter", 1) - registry.gauge("test_gauge", 42.0) - registry.histogram("test_histogram", 0.5) - registry.timer("test_timing", 1.5) + handler.counter("test_counter", 1) + handler.gauge("test_gauge", 42.0) + handler.histogram("test_histogram", 0.5) + handler.timer("test_timing", 1.5) - def test_registry_with_mock_collector(self): - """Test registry with mock collector.""" - mock_collector = Mock() - registry = MetricsRegistry(mock_collector) + def test_mock_handler(self): + """Test mock handler implementation.""" + mock_handler = Mock(spec=MetricsHandler) # Test counter - registry.counter("test_counter", 2, {"label": "value"}) - mock_collector.counter.assert_called_once_with( + mock_handler.counter("test_counter", 2, {"label": "value"}) + mock_handler.counter.assert_called_once_with( "test_counter", 2, {"label": "value"} ) # Test gauge - registry.gauge("test_gauge", 100.0, {"env": "test"}) - mock_collector.gauge.assert_called_once_with( + mock_handler.gauge("test_gauge", 100.0, {"env": "test"}) + mock_handler.gauge.assert_called_once_with( "test_gauge", 100.0, {"env": "test"} ) # Test timer - registry.timer("test_timing", 0.75) - mock_collector.timer.assert_called_once_with( - "test_timing", 0.75, None + mock_handler.timer("test_timing", 0.75, {"tag": "value"}) + mock_handler.timer.assert_called_once_with( + "test_timing", 0.75, {"tag": "value"} ) # Test histogram - registry.histogram("test_histogram", 2.5) - mock_collector.histogram.assert_called_once_with( - "test_histogram", 2.5, None + mock_handler.histogram("test_histogram", 2.5, {"env": "prod"}) + mock_handler.histogram.assert_called_once_with( + "test_histogram", 2.5, {"env": "prod"} ) - def test_set_collector(self): - """Test setting a new collector.""" - registry = MetricsRegistry() - mock_collector = Mock() - registry.set_collector(mock_collector) - registry.counter("test", 1) +class TestDefaultHandler: + """Test cases for default handler management.""" - mock_collector.counter.assert_called_once_with("test", 1, None) - - def test_register_metric(self): - """Test metric registration.""" - registry = MetricsRegistry() - - registry.register_metric("test_counter", MetricType.COUNTER) - registry.register_metric("test_gauge", MetricType.GAUGE) - - # Registering the same metric twice should not raise an error - registry.register_metric("test_counter", MetricType.COUNTER) - - def test_collector_exception_handling(self): - """Test that collector exceptions are handled gracefully.""" - mock_collector = Mock() - mock_collector.counter.side_effect = Exception("Test exception") - - registry = MetricsRegistry(mock_collector) - - # Should not raise exception, but log error - registry.counter("test", 1) - - mock_collector.counter.assert_called_once() - - -class TestNoOpMetricsCollector: - """Test cases for NoOpMetricsCollector.""" - - def test_no_op_collector(self): - """Test that no-op collector doesn't raise exceptions.""" - collector = NoOpMetricsCollector() - - # Should not raise any exceptions - collector.counter("test", 1, {"label": "value"}) - collector.gauge("test", 42.0) - collector.histogram("test", 0.5, {"env": "test"}) - collector.timer("test", 1.5) - - -class TestDefaultRegistry: - """Test cases for default registry management.""" - - def test_get_default_registry(self): - """Test getting the default registry.""" - registry = get_default_registry() - assert isinstance(registry, MetricsRegistry) + def test_get_default_handler(self): + """Test getting the default handler.""" + handler = get_default_handler() + assert isinstance(handler, NoOpMetricsHandler) # Should return the same instance - registry2 = get_default_registry() - assert registry is registry2 + handler2 = get_default_handler() + assert handler is handler2 - def test_set_default_registry(self): - """Test setting a custom default registry.""" - original_registry = get_default_registry() - custom_registry = MetricsRegistry() + def test_set_default_handler(self): + """Test setting a custom default handler.""" + original_handler = get_default_handler() + custom_handler = NoOpMetricsHandler() - set_default_registry(custom_registry) + set_default_handler(custom_handler) - # Should return the custom registry - current_registry = get_default_registry() - assert current_registry is custom_registry - assert current_registry is not original_registry + # Should return the custom handler + current_handler = get_default_handler() + assert current_handler is custom_handler + assert current_handler is not original_handler # Restore original for other tests - set_default_registry(original_registry) + set_default_handler(original_handler) diff --git a/tests/cadence/_internal/visibility/test_prometheus.py b/tests/cadence/_internal/visibility/test_prometheus.py index 08415e9..4139136 100644 --- a/tests/cadence/_internal/visibility/test_prometheus.py +++ b/tests/cadence/_internal/visibility/test_prometheus.py @@ -92,8 +92,8 @@ def test_merge_labels(self): assert merged == {"env": "prod", "service": "cadence", "operation": "poll"} @patch("cadence._internal.visibility.prometheus.Counter") - def test_count(self, mock_counter_class): - """Test count metric (M3-aligned).""" + def test_counter(self, mock_counter_class): + """Test counter metric.""" mock_counter = Mock() mock_labeled_counter = Mock() mock_counter.labels.return_value = mock_labeled_counter @@ -103,7 +103,7 @@ def test_count(self, mock_counter_class): metrics = PrometheusMetrics(config) # Test without labels - metrics.count("test_counter", 2) + metrics.counter("test_counter", 2) mock_counter.inc.assert_called_once_with(2) # Reset mock @@ -111,13 +111,13 @@ def test_count(self, mock_counter_class): mock_labeled_counter.reset_mock() # Test with labels - metrics.count("test_counter", 1, {"env": "test"}) + metrics.counter("test_counter", 1, {"env": "test"}) mock_counter.labels.assert_called_once_with(env="test") mock_labeled_counter.inc.assert_called_once_with(1) @patch("cadence._internal.visibility.prometheus.Gauge") def test_gauge(self, mock_gauge_class): - """Test gauge metric (M3-aligned).""" + """Test gauge metric.""" mock_gauge = Mock() mock_labeled_gauge = Mock() mock_gauge.labels.return_value = mock_labeled_gauge @@ -140,8 +140,8 @@ def test_gauge(self, mock_gauge_class): mock_labeled_gauge.set.assert_called_once_with(100.0) @patch("cadence._internal.visibility.prometheus.Histogram") - def test_timing(self, mock_histogram_class): - """Test timing metric (M3-aligned).""" + def test_timer(self, mock_histogram_class): + """Test timer metric.""" mock_histogram = Mock() mock_labeled_histogram = Mock() mock_histogram.labels.return_value = mock_labeled_histogram @@ -151,7 +151,7 @@ def test_timing(self, mock_histogram_class): metrics = PrometheusMetrics(config) # Test without labels - metrics.timing("test_timing", 0.5) + metrics.timer("test_timer", 0.5) mock_histogram.observe.assert_called_once_with(0.5) # Reset mock @@ -159,13 +159,13 @@ def test_timing(self, mock_histogram_class): mock_labeled_histogram.reset_mock() # Test with labels - metrics.timing("test_timing", 1.0, {"env": "test"}) + metrics.timer("test_timer", 1.0, {"env": "test"}) mock_histogram.labels.assert_called_once_with(env="test") mock_labeled_histogram.observe.assert_called_once_with(1.0) @patch("cadence._internal.visibility.prometheus.Histogram") def test_histogram(self, mock_histogram_class): - """Test histogram metric (M3-aligned).""" + """Test histogram metric.""" mock_histogram = Mock() mock_labeled_histogram = Mock() mock_histogram.labels.return_value = mock_labeled_histogram From bedee71951fd427fbd1265a0a727ed238e05eb0c Mon Sep 17 00:00:00 2001 From: Tim Li Date: Mon, 29 Sep 2025 09:24:12 -0700 Subject: [PATCH 3/5] naming Signed-off-by: Tim Li --- cadence/_internal/visibility/metrics.py | 2 +- cadence/_internal/visibility/prometheus.py | 24 +++++++++---------- .../_internal/visibility/test_metrics.py | 12 ++++++++++ 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/cadence/_internal/visibility/metrics.py b/cadence/_internal/visibility/metrics.py index bcf68ee..d5882f1 100644 --- a/cadence/_internal/visibility/metrics.py +++ b/cadence/_internal/visibility/metrics.py @@ -13,7 +13,7 @@ class MetricType(Enum): COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram" - SUMMARY = "summary" + TIMER = "timer" class MetricsHandler(Protocol): diff --git a/cadence/_internal/visibility/prometheus.py b/cadence/_internal/visibility/prometheus.py index 79efe64..cc3a2f2 100644 --- a/cadence/_internal/visibility/prometheus.py +++ b/cadence/_internal/visibility/prometheus.py @@ -10,7 +10,6 @@ Counter, Gauge, Histogram, - Summary, generate_latest, push_to_gateway, start_http_server, @@ -55,7 +54,6 @@ def __init__(self, config: Optional[PrometheusConfig] = None): self._counters: Dict[str, Counter] = {} self._gauges: Dict[str, Gauge] = {} self._histograms: Dict[str, Histogram] = {} - self._summaries: Dict[str, Summary] = {} # HTTP server handle self._http_server: Optional[Any] = None @@ -128,23 +126,23 @@ def _get_or_create_histogram( return self._histograms[metric_name] - def _get_or_create_summary( + def _get_or_create_timer( self, name: str, labels: Optional[Dict[str, str]] - ) -> Summary: - """Get or create a Summary metric.""" + ) -> Histogram: + """Get or create a timer metric (implemented as histogram).""" metric_name = self._get_metric_name(name) - if metric_name not in self._summaries: + if metric_name not in self._histograms: label_names = list(self._merge_labels(labels).keys()) if labels else [] - self._summaries[metric_name] = Summary( + self._histograms[metric_name] = Histogram( metric_name, - f"Summary metric for {name}", + f"Timer metric for {name}", labelnames=label_names, registry=self.registry, ) - logger.debug(f"Created summary metric: {metric_name}") + logger.debug(f"Created timer metric: {metric_name}") - return self._summaries[metric_name] + return self._histograms[metric_name] def counter( self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None @@ -183,13 +181,13 @@ def timer( ) -> None: """Send a timer metric - implemented as histogram.""" try: - histogram = self._get_or_create_histogram(key, tags) + timer = self._get_or_create_timer(key, tags) merged_tags = self._merge_labels(tags) if merged_tags: - histogram.labels(**merged_tags).observe(duration) + timer.labels(**merged_tags).observe(duration) else: - histogram.observe(duration) + timer.observe(duration) except Exception as e: logger.error(f"Failed to send timer {key}: {e}") diff --git a/tests/cadence/_internal/visibility/test_metrics.py b/tests/cadence/_internal/visibility/test_metrics.py index f2a58bc..3b22e08 100644 --- a/tests/cadence/_internal/visibility/test_metrics.py +++ b/tests/cadence/_internal/visibility/test_metrics.py @@ -5,6 +5,7 @@ from cadence._internal.visibility.metrics import ( MetricsHandler, + MetricType, NoOpMetricsHandler, get_default_handler, set_default_handler, @@ -79,3 +80,14 @@ def test_set_default_handler(self): # Restore original for other tests set_default_handler(original_handler) + + +class TestMetricType: + """Test cases for MetricType enum.""" + + def test_metric_type_values(self): + """Test that MetricType enum has correct values.""" + assert MetricType.COUNTER.value == "counter" + assert MetricType.GAUGE.value == "gauge" + assert MetricType.HISTOGRAM.value == "histogram" + assert MetricType.TIMER.value == "timer" From 4b57026e2fdec91ee874443eb6ff55e3ca4a94e5 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Mon, 29 Sep 2025 16:31:06 -0700 Subject: [PATCH 4/5] respond to comments Signed-off-by: Tim Li --- cadence/_internal/visibility/__init__.py | 10 +- cadence/_internal/visibility/metrics.py | 32 +- cadence/_internal/visibility/prometheus.py | 107 +----- cadence/client.py | 7 + .../_internal/visibility/test_metrics.py | 73 +--- .../_internal/visibility/test_prometheus.py | 343 +++++++----------- 6 files changed, 163 insertions(+), 409 deletions(-) diff --git a/cadence/_internal/visibility/__init__.py b/cadence/_internal/visibility/__init__.py index 3966af2..989ceee 100644 --- a/cadence/_internal/visibility/__init__.py +++ b/cadence/_internal/visibility/__init__.py @@ -1,12 +1,12 @@ """Visibility and metrics collection components for Cadence client.""" -from .metrics import MetricsHandler, NoOpMetricsHandler, get_default_handler -from .prometheus import PrometheusMetrics, PrometheusConfig +from .metrics import MetricsEmitter, NoOpMetricsEmitter +from .prometheus import PrometheusMetrics, PrometheusConfig, CadenceMetrics __all__ = [ - "MetricsHandler", - "NoOpMetricsHandler", - "get_default_handler", + "MetricsEmitter", + "NoOpMetricsEmitter", "PrometheusMetrics", "PrometheusConfig", + "CadenceMetrics", ] diff --git a/cadence/_internal/visibility/metrics.py b/cadence/_internal/visibility/metrics.py index d5882f1..d2ca3d2 100644 --- a/cadence/_internal/visibility/metrics.py +++ b/cadence/_internal/visibility/metrics.py @@ -13,10 +13,9 @@ class MetricType(Enum): COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram" - TIMER = "timer" -class MetricsHandler(Protocol): +class MetricsEmitter(Protocol): """Protocol for metrics collection backends.""" def counter( @@ -31,11 +30,6 @@ def gauge( """Send a gauge metric.""" ... - def timer( - self, key: str, duration: float, tags: Optional[Dict[str, str]] = None - ) -> None: - """Send a timer metric.""" - ... def histogram( self, key: str, value: float, tags: Optional[Dict[str, str]] = None @@ -44,8 +38,8 @@ def histogram( ... -class NoOpMetricsHandler: - """No-op metrics handler that discards all metrics.""" +class NoOpMetricsEmitter: + """No-op metrics emitter that discards all metrics.""" def counter( self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None @@ -57,10 +51,6 @@ def gauge( ) -> None: pass - def timer( - self, key: str, duration: float, tags: Optional[Dict[str, str]] = None - ) -> None: - pass def histogram( self, key: str, value: float, tags: Optional[Dict[str, str]] = None @@ -68,19 +58,3 @@ def histogram( pass -# Global default handler -_default_handler: Optional[MetricsHandler] = None - - -def get_default_handler() -> MetricsHandler: - """Get the default global metrics handler.""" - global _default_handler - if _default_handler is None: - _default_handler = NoOpMetricsHandler() - return _default_handler - - -def set_default_handler(handler: MetricsHandler) -> None: - """Set the default global metrics handler.""" - global _default_handler - _default_handler = handler diff --git a/cadence/_internal/visibility/prometheus.py b/cadence/_internal/visibility/prometheus.py index cc3a2f2..66cab93 100644 --- a/cadence/_internal/visibility/prometheus.py +++ b/cadence/_internal/visibility/prometheus.py @@ -2,7 +2,8 @@ import logging from dataclasses import dataclass, field -from typing import Dict, Optional, Any +from enum import Enum +from typing import Dict, Optional from prometheus_client import ( # type: ignore[import-not-found] REGISTRY, @@ -11,10 +12,10 @@ Gauge, Histogram, generate_latest, - push_to_gateway, - start_http_server, ) +from cadence._internal.visibility.metrics import MetricsEmitter + logger = logging.getLogger(__name__) @@ -23,16 +24,6 @@ class PrometheusConfig: """Configuration for Prometheus metrics.""" - # HTTP server configuration - enable_http_server: bool = False - http_port: int = 8000 - http_addr: str = "0.0.0.0" - - # Push gateway configuration - enable_push_gateway: bool = False - push_gateway_url: str = "localhost:9091" - push_job_name: str = "cadence_client" - # Metric name prefix metric_prefix: str = "cadence_" @@ -43,7 +34,7 @@ class PrometheusConfig: registry: Optional[CollectorRegistry] = None -class PrometheusMetrics: +class PrometheusMetrics(MetricsEmitter): """Prometheus metrics collector implementation.""" def __init__(self, config: Optional[PrometheusConfig] = None): @@ -55,12 +46,6 @@ def __init__(self, config: Optional[PrometheusConfig] = None): self._gauges: Dict[str, Gauge] = {} self._histograms: Dict[str, Histogram] = {} - # HTTP server handle - self._http_server: Optional[Any] = None - - if self.config.enable_http_server: - self.start_http_server() - def _get_metric_name(self, name: str) -> str: """Get the full metric name with prefix.""" return f"{self.config.metric_prefix}{name}" @@ -126,23 +111,6 @@ def _get_or_create_histogram( return self._histograms[metric_name] - def _get_or_create_timer( - self, name: str, labels: Optional[Dict[str, str]] - ) -> Histogram: - """Get or create a timer metric (implemented as histogram).""" - metric_name = self._get_metric_name(name) - - if metric_name not in self._histograms: - label_names = list(self._merge_labels(labels).keys()) if labels else [] - self._histograms[metric_name] = Histogram( - metric_name, - f"Timer metric for {name}", - labelnames=label_names, - registry=self.registry, - ) - logger.debug(f"Created timer metric: {metric_name}") - - return self._histograms[metric_name] def counter( self, key: str, n: int = 1, tags: Optional[Dict[str, str]] = None @@ -176,21 +144,6 @@ def gauge( except Exception as e: logger.error(f"Failed to send gauge {key}: {e}") - def timer( - self, key: str, duration: float, tags: Optional[Dict[str, str]] = None - ) -> None: - """Send a timer metric - implemented as histogram.""" - try: - timer = self._get_or_create_timer(key, tags) - merged_tags = self._merge_labels(tags) - - if merged_tags: - timer.labels(**merged_tags).observe(duration) - else: - timer.observe(duration) - - except Exception as e: - logger.error(f"Failed to send timer {key}: {e}") def histogram( self, key: str, value: float, tags: Optional[Dict[str, str]] = None @@ -208,44 +161,6 @@ def histogram( except Exception as e: logger.error(f"Failed to send histogram {key}: {e}") - def start_http_server(self) -> None: - """Start HTTP server to expose metrics.""" - if self._http_server is not None: - logger.warning("HTTP server already started") - return - - try: - server_result = start_http_server( - self.config.http_port, - addr=self.config.http_addr, - registry=self.registry, - ) - self._http_server = server_result - logger.info( - f"Prometheus metrics HTTP server started on " - f"{self.config.http_addr}:{self.config.http_port}" - ) - except Exception as e: - logger.error(f"Failed to start HTTP server: {e}") - raise - - def push_to_gateway(self) -> None: - """Push metrics to Prometheus Push Gateway.""" - if not self.config.enable_push_gateway: - logger.warning("Push gateway not enabled") - return - - try: - push_to_gateway( - self.config.push_gateway_url, - job=self.config.push_job_name, - registry=self.registry, - ) - logger.debug(f"Pushed metrics to gateway: {self.config.push_gateway_url}") - except Exception as e: - logger.error(f"Failed to push to gateway: {e}") - raise - def get_metrics_text(self) -> str: """Get metrics in Prometheus text format.""" try: @@ -255,19 +170,9 @@ def get_metrics_text(self) -> str: logger.error(f"Failed to generate metrics text: {e}") return "" - def shutdown(self) -> None: - """Shutdown the metrics collector.""" - if self._http_server: - try: - self._http_server.shutdown() - self._http_server = None - logger.info("Prometheus HTTP server shutdown") - except Exception as e: - logger.error(f"Failed to shutdown HTTP server: {e}") - # Default Cadence metrics names -class CadenceMetrics: +class CadenceMetrics(Enum): """Standard Cadence client metrics.""" # Workflow metrics diff --git a/cadence/client.py b/cadence/client.py index 10abc39..8159995 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -12,6 +12,7 @@ from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub from cadence.data_converter import DataConverter, DefaultDataConverter +from cadence._internal.visibility.metrics import MetricsEmitter, NoOpMetricsEmitter class ClientOptions(TypedDict, total=False): @@ -24,6 +25,7 @@ class ClientOptions(TypedDict, total=False): channel_arguments: dict[str, Any] credentials: ChannelCredentials | None compression: Compression + metrics_emitter: MetricsEmitter interceptors: list[ClientInterceptor] _DEFAULT_OPTIONS: ClientOptions = { @@ -34,6 +36,7 @@ class ClientOptions(TypedDict, total=False): "channel_arguments": {}, "credentials": None, "compression": Compression.NoCompression, + "metrics_emitter": NoOpMetricsEmitter(), "interceptors": [], } @@ -69,6 +72,10 @@ def worker_stub(self) -> WorkerAPIStub: def workflow_stub(self) -> WorkflowAPIStub: return self._workflow_stub + @property + def metrics_emitter(self) -> MetricsEmitter: + return self._options["metrics_emitter"] + async def ready(self) -> None: await self._channel.channel_ready() diff --git a/tests/cadence/_internal/visibility/test_metrics.py b/tests/cadence/_internal/visibility/test_metrics.py index 3b22e08..2924969 100644 --- a/tests/cadence/_internal/visibility/test_metrics.py +++ b/tests/cadence/_internal/visibility/test_metrics.py @@ -4,82 +4,48 @@ from cadence._internal.visibility.metrics import ( - MetricsHandler, + MetricsEmitter, MetricType, - NoOpMetricsHandler, - get_default_handler, - set_default_handler, + NoOpMetricsEmitter, ) -class TestMetricsHandler: - """Test cases for MetricsHandler protocol.""" +class TestMetricsEmitter: + """Test cases for MetricsEmitter protocol.""" - def test_noop_handler(self): - """Test no-op handler doesn't raise exceptions.""" - handler = NoOpMetricsHandler() + def test_noop_emitter(self): + """Test no-op emitter doesn't raise exceptions.""" + emitter = NoOpMetricsEmitter() # Should not raise any exceptions - handler.counter("test_counter", 1) - handler.gauge("test_gauge", 42.0) - handler.histogram("test_histogram", 0.5) - handler.timer("test_timing", 1.5) + emitter.counter("test_counter", 1) + emitter.gauge("test_gauge", 42.0) + emitter.histogram("test_histogram", 0.5) - def test_mock_handler(self): - """Test mock handler implementation.""" - mock_handler = Mock(spec=MetricsHandler) + def test_mock_emitter(self): + """Test mock emitter implementation.""" + mock_emitter = Mock(spec=MetricsEmitter) # Test counter - mock_handler.counter("test_counter", 2, {"label": "value"}) - mock_handler.counter.assert_called_once_with( + mock_emitter.counter("test_counter", 2, {"label": "value"}) + mock_emitter.counter.assert_called_once_with( "test_counter", 2, {"label": "value"} ) # Test gauge - mock_handler.gauge("test_gauge", 100.0, {"env": "test"}) - mock_handler.gauge.assert_called_once_with( + mock_emitter.gauge("test_gauge", 100.0, {"env": "test"}) + mock_emitter.gauge.assert_called_once_with( "test_gauge", 100.0, {"env": "test"} ) - # Test timer - mock_handler.timer("test_timing", 0.75, {"tag": "value"}) - mock_handler.timer.assert_called_once_with( - "test_timing", 0.75, {"tag": "value"} - ) # Test histogram - mock_handler.histogram("test_histogram", 2.5, {"env": "prod"}) - mock_handler.histogram.assert_called_once_with( + mock_emitter.histogram("test_histogram", 2.5, {"env": "prod"}) + mock_emitter.histogram.assert_called_once_with( "test_histogram", 2.5, {"env": "prod"} ) -class TestDefaultHandler: - """Test cases for default handler management.""" - - def test_get_default_handler(self): - """Test getting the default handler.""" - handler = get_default_handler() - assert isinstance(handler, NoOpMetricsHandler) - - # Should return the same instance - handler2 = get_default_handler() - assert handler is handler2 - - def test_set_default_handler(self): - """Test setting a custom default handler.""" - original_handler = get_default_handler() - custom_handler = NoOpMetricsHandler() - - set_default_handler(custom_handler) - - # Should return the custom handler - current_handler = get_default_handler() - assert current_handler is custom_handler - assert current_handler is not original_handler - - # Restore original for other tests - set_default_handler(original_handler) class TestMetricType: @@ -90,4 +56,3 @@ def test_metric_type_values(self): assert MetricType.COUNTER.value == "counter" assert MetricType.GAUGE.value == "gauge" assert MetricType.HISTOGRAM.value == "histogram" - assert MetricType.TIMER.value == "timer" diff --git a/tests/cadence/_internal/visibility/test_prometheus.py b/tests/cadence/_internal/visibility/test_prometheus.py index 4139136..f16bd3e 100644 --- a/tests/cadence/_internal/visibility/test_prometheus.py +++ b/tests/cadence/_internal/visibility/test_prometheus.py @@ -2,11 +2,10 @@ from unittest.mock import Mock, patch - from cadence._internal.visibility.prometheus import ( - CadenceMetrics, - PrometheusConfig, PrometheusMetrics, + PrometheusConfig, + CadenceMetrics, ) @@ -16,266 +15,170 @@ class TestPrometheusConfig: def test_default_config(self): """Test default configuration values.""" config = PrometheusConfig() - - assert config.enable_http_server is False - assert config.http_port == 8000 - assert config.http_addr == "0.0.0.0" - assert config.enable_push_gateway is False - assert config.push_gateway_url == "localhost:9091" - assert config.push_job_name == "cadence_client" assert config.metric_prefix == "cadence_" assert config.default_labels == {} assert config.registry is None def test_custom_config(self): """Test custom configuration values.""" + from prometheus_client import CollectorRegistry + + registry = CollectorRegistry() config = PrometheusConfig( - enable_http_server=True, - http_port=9000, - metric_prefix="my_cadence_", + metric_prefix="my_", default_labels={"env": "test"}, + registry=registry ) - - assert config.enable_http_server is True - assert config.http_port == 9000 - assert config.metric_prefix == "my_cadence_" + assert config.metric_prefix == "my_" assert config.default_labels == {"env": "test"} + assert config.registry is registry class TestPrometheusMetrics: """Test cases for PrometheusMetrics.""" - @patch("cadence._internal.visibility.prometheus.start_http_server") - def test_initialization_with_http_server(self, mock_start_server): - """Test PrometheusMetrics initialization with HTTP server.""" - config = PrometheusConfig(enable_http_server=True, http_port=8001) - - metrics = PrometheusMetrics(config) - - assert metrics.config == config - mock_start_server.assert_called_once_with( - 8001, addr="0.0.0.0", registry=metrics.registry + def test_init_with_default_config(self): + """Test initialization with default config.""" + metrics = PrometheusMetrics() + assert metrics.config.metric_prefix == "cadence_" + assert metrics.registry is not None + + def test_init_with_custom_config(self): + """Test initialization with custom config.""" + from prometheus_client import CollectorRegistry + + registry = CollectorRegistry() + config = PrometheusConfig( + metric_prefix="custom_", + default_labels={"service": "test"}, + registry=registry ) - - def test_initialization_without_http_server(self): - """Test PrometheusMetrics initialization without HTTP server.""" - config = PrometheusConfig(enable_http_server=False) - - metrics = PrometheusMetrics(config) - - assert metrics.config == config - assert metrics._http_server is None - - def test_metric_name_with_prefix(self): - """Test metric name generation with prefix.""" - config = PrometheusConfig(metric_prefix="test_") - metrics = PrometheusMetrics(config) - - name = metrics._get_metric_name("counter") - assert name == "test_counter" - - def test_merge_labels(self): - """Test label merging with default labels.""" - config = PrometheusConfig(default_labels={"env": "test", "service": "cadence"}) metrics = PrometheusMetrics(config) + assert metrics.config.metric_prefix == "custom_" + assert metrics.registry is registry - # Test with no additional labels - merged = metrics._merge_labels(None) - assert merged == {"env": "test", "service": "cadence"} - - # Test with additional labels - merged = metrics._merge_labels({"operation": "poll"}) - assert merged == {"env": "test", "service": "cadence", "operation": "poll"} - - # Test label override - merged = metrics._merge_labels({"env": "prod", "operation": "poll"}) - assert merged == {"env": "prod", "service": "cadence", "operation": "poll"} - - @patch("cadence._internal.visibility.prometheus.Counter") - def test_counter(self, mock_counter_class): - """Test counter metric.""" + @patch('cadence._internal.visibility.prometheus.Counter') + def test_counter_metric(self, mock_counter_class): + """Test counter metric creation and usage.""" mock_counter = Mock() - mock_labeled_counter = Mock() - mock_counter.labels.return_value = mock_labeled_counter mock_counter_class.return_value = mock_counter - - config = PrometheusConfig() - metrics = PrometheusMetrics(config) - - # Test without labels - metrics.counter("test_counter", 2) - mock_counter.inc.assert_called_once_with(2) - - # Reset mock - mock_counter.reset_mock() - mock_labeled_counter.reset_mock() - - # Test with labels - metrics.counter("test_counter", 1, {"env": "test"}) - mock_counter.labels.assert_called_once_with(env="test") - mock_labeled_counter.inc.assert_called_once_with(1) - - @patch("cadence._internal.visibility.prometheus.Gauge") - def test_gauge(self, mock_gauge_class): - """Test gauge metric.""" + + metrics = PrometheusMetrics() + metrics.counter("test_counter", 5, {"label": "value"}) + + # Verify counter was created + mock_counter_class.assert_called_once() + mock_counter.labels.assert_called_once_with(label="value") + mock_counter.labels.return_value.inc.assert_called_once_with(5) + + @patch('cadence._internal.visibility.prometheus.Gauge') + def test_gauge_metric(self, mock_gauge_class): + """Test gauge metric creation and usage.""" mock_gauge = Mock() - mock_labeled_gauge = Mock() - mock_gauge.labels.return_value = mock_labeled_gauge mock_gauge_class.return_value = mock_gauge - - config = PrometheusConfig() - metrics = PrometheusMetrics(config) - - # Test without labels - metrics.gauge("test_gauge", 42.0) - mock_gauge.set.assert_called_once_with(42.0) - - # Reset mock - mock_gauge.reset_mock() - mock_labeled_gauge.reset_mock() - - # Test with labels - metrics.gauge("test_gauge", 100.0, {"env": "test"}) - mock_gauge.labels.assert_called_once_with(env="test") - mock_labeled_gauge.set.assert_called_once_with(100.0) - - @patch("cadence._internal.visibility.prometheus.Histogram") - def test_timer(self, mock_histogram_class): - """Test timer metric.""" + + metrics = PrometheusMetrics() + metrics.gauge("test_gauge", 42.5, {"env": "prod"}) + + # Verify gauge was created + mock_gauge_class.assert_called_once() + mock_gauge.labels.assert_called_once_with(env="prod") + mock_gauge.labels.return_value.set.assert_called_once_with(42.5) + + @patch('cadence._internal.visibility.prometheus.Histogram') + def test_histogram_metric(self, mock_histogram_class): + """Test histogram metric creation and usage.""" mock_histogram = Mock() - mock_labeled_histogram = Mock() - mock_histogram.labels.return_value = mock_labeled_histogram mock_histogram_class.return_value = mock_histogram + + metrics = PrometheusMetrics() + metrics.histogram("test_histogram", 1.5, {"type": "latency"}) + + # Verify histogram was created + mock_histogram_class.assert_called_once() + mock_histogram.labels.assert_called_once_with(type="latency") + mock_histogram.labels.return_value.observe.assert_called_once_with(1.5) - config = PrometheusConfig() - metrics = PrometheusMetrics(config) - - # Test without labels - metrics.timer("test_timer", 0.5) - mock_histogram.observe.assert_called_once_with(0.5) - - # Reset mock - mock_histogram.reset_mock() - mock_labeled_histogram.reset_mock() - - # Test with labels - metrics.timer("test_timer", 1.0, {"env": "test"}) - mock_histogram.labels.assert_called_once_with(env="test") - mock_labeled_histogram.observe.assert_called_once_with(1.0) - @patch("cadence._internal.visibility.prometheus.Histogram") - def test_histogram(self, mock_histogram_class): - """Test histogram metric.""" - mock_histogram = Mock() - mock_labeled_histogram = Mock() - mock_histogram.labels.return_value = mock_labeled_histogram - mock_histogram_class.return_value = mock_histogram - - config = PrometheusConfig() + def test_metric_name_generation(self): + """Test metric name generation with prefix.""" + config = PrometheusConfig(metric_prefix="my_app_") metrics = PrometheusMetrics(config) + + metric_name = metrics._get_metric_name("test_metric") + assert metric_name == "my_app_test_metric" - # Test without labels - metrics.histogram("test_histogram", 2.5) - mock_histogram.observe.assert_called_once_with(2.5) - - # Reset mock - mock_histogram.reset_mock() - mock_labeled_histogram.reset_mock() - - # Test with labels - metrics.histogram("test_histogram", 3.0, {"env": "test"}) - mock_histogram.labels.assert_called_once_with(env="test") - mock_labeled_histogram.observe.assert_called_once_with(3.0) - - @patch("cadence._internal.visibility.prometheus.push_to_gateway") - def test_push_to_gateway(self, mock_push): - """Test pushing metrics to gateway.""" + def test_label_merging(self): + """Test label merging with default labels.""" config = PrometheusConfig( - enable_push_gateway=True, - push_gateway_url="localhost:9091", - push_job_name="test_job", + default_labels={"service": "cadence", "version": "1.0"} ) metrics = PrometheusMetrics(config) - - metrics.push_to_gateway() - - mock_push.assert_called_once_with( - "localhost:9091", job="test_job", registry=metrics.registry - ) - - def test_push_to_gateway_disabled(self): - """Test push to gateway when disabled.""" - config = PrometheusConfig(enable_push_gateway=False) - metrics = PrometheusMetrics(config) - - # Should not raise exception - metrics.push_to_gateway() - - @patch("cadence._internal.visibility.prometheus.generate_latest") - def test_get_metrics_text(self, mock_generate): - """Test getting metrics as text.""" - mock_generate.return_value = ( - b"# HELP test_counter Test counter\ntest_counter 1.0\n" - ) - - config = PrometheusConfig() - metrics = PrometheusMetrics(config) - - text = metrics.get_metrics_text() - - assert text == "# HELP test_counter Test counter\ntest_counter 1.0\n" - mock_generate.assert_called_once_with(metrics.registry) - - def test_shutdown(self): - """Test metrics shutdown.""" - config = PrometheusConfig(enable_http_server=False) - metrics = PrometheusMetrics(config) - - # Mock HTTP server - mock_server = Mock() - metrics._http_server = mock_server # type: ignore - - metrics.shutdown() - - mock_server.shutdown.assert_called_once() - assert metrics._http_server is None + + # Test merging with provided labels + merged = metrics._merge_labels({"operation": "start"}) + expected = {"service": "cadence", "version": "1.0", "operation": "start"} + assert merged == expected + + # Test merging with None labels + merged_none = metrics._merge_labels(None) + assert merged_none == {"service": "cadence", "version": "1.0"} + + @patch('cadence._internal.visibility.prometheus.generate_latest') + def test_get_metrics_text(self, mock_generate_latest): + """Test getting metrics in text format.""" + mock_generate_latest.return_value = b"# HELP test_metric Test metric\n# TYPE test_metric counter\ntest_metric 1.0\n" + + metrics = PrometheusMetrics() + result = metrics.get_metrics_text() + + assert result == "# HELP test_metric Test metric\n# TYPE test_metric counter\ntest_metric 1.0\n" + mock_generate_latest.assert_called_once_with(metrics.registry) + + def test_error_handling_in_counter(self): + """Test error handling in counter method.""" + metrics = PrometheusMetrics() + + # This should not raise an exception + with patch.object(metrics, '_get_or_create_counter', side_effect=Exception("Test error")): + metrics.counter("test_counter", 1) + # Should not raise, just log error + + def test_error_handling_in_gauge(self): + """Test error handling in gauge method.""" + metrics = PrometheusMetrics() + + # This should not raise an exception + with patch.object(metrics, '_get_or_create_gauge', side_effect=Exception("Test error")): + metrics.gauge("test_gauge", 1.0) + # Should not raise, just log error class TestCadenceMetrics: - """Test cases for CadenceMetrics constants.""" + """Test cases for CadenceMetrics enum.""" def test_workflow_metrics(self): """Test workflow metric names.""" - assert CadenceMetrics.WORKFLOW_STARTED_TOTAL == "workflow_started_total" - assert CadenceMetrics.WORKFLOW_COMPLETED_TOTAL == "workflow_completed_total" - assert CadenceMetrics.WORKFLOW_FAILED_TOTAL == "workflow_failed_total" - assert CadenceMetrics.WORKFLOW_DURATION_SECONDS == "workflow_duration_seconds" + assert CadenceMetrics.WORKFLOW_STARTED_TOTAL.value == "workflow_started_total" + assert CadenceMetrics.WORKFLOW_COMPLETED_TOTAL.value == "workflow_completed_total" + assert CadenceMetrics.WORKFLOW_FAILED_TOTAL.value == "workflow_failed_total" + assert CadenceMetrics.WORKFLOW_DURATION_SECONDS.value == "workflow_duration_seconds" def test_activity_metrics(self): """Test activity metric names.""" - assert CadenceMetrics.ACTIVITY_STARTED_TOTAL == "activity_started_total" - assert CadenceMetrics.ACTIVITY_COMPLETED_TOTAL == "activity_completed_total" - assert CadenceMetrics.ACTIVITY_FAILED_TOTAL == "activity_failed_total" - assert CadenceMetrics.ACTIVITY_DURATION_SECONDS == "activity_duration_seconds" + assert CadenceMetrics.ACTIVITY_STARTED_TOTAL.value == "activity_started_total" + assert CadenceMetrics.ACTIVITY_COMPLETED_TOTAL.value == "activity_completed_total" + assert CadenceMetrics.ACTIVITY_FAILED_TOTAL.value == "activity_failed_total" + assert CadenceMetrics.ACTIVITY_DURATION_SECONDS.value == "activity_duration_seconds" def test_worker_metrics(self): """Test worker metric names.""" - assert CadenceMetrics.WORKER_TASK_POLLS_TOTAL == "worker_task_polls_total" - assert ( - CadenceMetrics.WORKER_TASK_POLL_ERRORS_TOTAL - == "worker_task_poll_errors_total" - ) - assert CadenceMetrics.WORKER_ACTIVE_TASKS == "worker_active_tasks" + assert CadenceMetrics.WORKER_TASK_POLLS_TOTAL.value == "worker_task_polls_total" + assert CadenceMetrics.WORKER_TASK_POLL_ERRORS_TOTAL.value == "worker_task_poll_errors_total" + assert CadenceMetrics.WORKER_ACTIVE_TASKS.value == "worker_active_tasks" def test_client_metrics(self): """Test client metric names.""" - assert CadenceMetrics.CLIENT_REQUESTS_TOTAL == "client_requests_total" - assert ( - CadenceMetrics.CLIENT_REQUEST_DURATION_SECONDS - == "client_request_duration_seconds" - ) - assert ( - CadenceMetrics.CLIENT_REQUEST_ERRORS_TOTAL == "client_request_errors_total" - ) - - + assert CadenceMetrics.CLIENT_REQUESTS_TOTAL.value == "client_requests_total" + assert CadenceMetrics.CLIENT_REQUEST_DURATION_SECONDS.value == "client_request_duration_seconds" + assert CadenceMetrics.CLIENT_REQUEST_ERRORS_TOTAL.value == "client_request_errors_total" \ No newline at end of file From 6ac8143300ab74f7db3c41490ae2ce9fe19c4ccd Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 30 Sep 2025 15:13:53 -0700 Subject: [PATCH 5/5] respond to comments Signed-off-by: Tim Li --- cadence/_internal/visibility/__init__.py | 12 ----- cadence/client.py | 2 +- cadence/metrics/__init__.py | 12 +++++ .../visibility => metrics}/metrics.py | 0 .../visibility => metrics}/prometheus.py | 37 ++----------- .../visibility => metrics}/test_metrics.py | 2 +- .../visibility => metrics}/test_prometheus.py | 53 +++---------------- 7 files changed, 25 insertions(+), 93 deletions(-) delete mode 100644 cadence/_internal/visibility/__init__.py create mode 100644 cadence/metrics/__init__.py rename cadence/{_internal/visibility => metrics}/metrics.py (100%) rename cadence/{_internal/visibility => metrics}/prometheus.py (80%) rename tests/cadence/{_internal/visibility => metrics}/test_metrics.py (96%) rename tests/cadence/{_internal/visibility => metrics}/test_prometheus.py (67%) diff --git a/cadence/_internal/visibility/__init__.py b/cadence/_internal/visibility/__init__.py deleted file mode 100644 index 989ceee..0000000 --- a/cadence/_internal/visibility/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -"""Visibility and metrics collection components for Cadence client.""" - -from .metrics import MetricsEmitter, NoOpMetricsEmitter -from .prometheus import PrometheusMetrics, PrometheusConfig, CadenceMetrics - -__all__ = [ - "MetricsEmitter", - "NoOpMetricsEmitter", - "PrometheusMetrics", - "PrometheusConfig", - "CadenceMetrics", -] diff --git a/cadence/client.py b/cadence/client.py index 8159995..77ec95c 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -12,7 +12,7 @@ from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub from cadence.data_converter import DataConverter, DefaultDataConverter -from cadence._internal.visibility.metrics import MetricsEmitter, NoOpMetricsEmitter +from cadence.metrics import MetricsEmitter, NoOpMetricsEmitter class ClientOptions(TypedDict, total=False): diff --git a/cadence/metrics/__init__.py b/cadence/metrics/__init__.py new file mode 100644 index 0000000..a933fea --- /dev/null +++ b/cadence/metrics/__init__.py @@ -0,0 +1,12 @@ +"""Metrics collection components for Cadence client.""" + +from .metrics import MetricsEmitter, NoOpMetricsEmitter, MetricType +from .prometheus import PrometheusMetrics, PrometheusConfig + +__all__ = [ + "MetricsEmitter", + "NoOpMetricsEmitter", + "MetricType", + "PrometheusMetrics", + "PrometheusConfig", +] diff --git a/cadence/_internal/visibility/metrics.py b/cadence/metrics/metrics.py similarity index 100% rename from cadence/_internal/visibility/metrics.py rename to cadence/metrics/metrics.py diff --git a/cadence/_internal/visibility/prometheus.py b/cadence/metrics/prometheus.py similarity index 80% rename from cadence/_internal/visibility/prometheus.py rename to cadence/metrics/prometheus.py index 66cab93..277c863 100644 --- a/cadence/_internal/visibility/prometheus.py +++ b/cadence/metrics/prometheus.py @@ -2,7 +2,6 @@ import logging from dataclasses import dataclass, field -from enum import Enum from typing import Dict, Optional from prometheus_client import ( # type: ignore[import-not-found] @@ -14,7 +13,7 @@ generate_latest, ) -from cadence._internal.visibility.metrics import MetricsEmitter +from .metrics import MetricsEmitter logger = logging.getLogger(__name__) @@ -24,9 +23,6 @@ class PrometheusConfig: """Configuration for Prometheus metrics.""" - # Metric name prefix - metric_prefix: str = "cadence_" - # Default labels to apply to all metrics default_labels: Dict[str, str] = field(default_factory=dict) @@ -47,8 +43,8 @@ def __init__(self, config: Optional[PrometheusConfig] = None): self._histograms: Dict[str, Histogram] = {} def _get_metric_name(self, name: str) -> str: - """Get the full metric name with prefix.""" - return f"{self.config.metric_prefix}{name}" + """Get the metric name.""" + return name def _merge_labels(self, labels: Optional[Dict[str, str]]) -> Dict[str, str]: """Merge provided labels with default labels.""" @@ -169,30 +165,3 @@ def get_metrics_text(self) -> str: except Exception as e: logger.error(f"Failed to generate metrics text: {e}") return "" - - -# Default Cadence metrics names -class CadenceMetrics(Enum): - """Standard Cadence client metrics.""" - - # Workflow metrics - WORKFLOW_STARTED_TOTAL = "workflow_started_total" - WORKFLOW_COMPLETED_TOTAL = "workflow_completed_total" - WORKFLOW_FAILED_TOTAL = "workflow_failed_total" - WORKFLOW_DURATION_SECONDS = "workflow_duration_seconds" - - # Activity metrics - ACTIVITY_STARTED_TOTAL = "activity_started_total" - ACTIVITY_COMPLETED_TOTAL = "activity_completed_total" - ACTIVITY_FAILED_TOTAL = "activity_failed_total" - ACTIVITY_DURATION_SECONDS = "activity_duration_seconds" - - # Worker metrics - WORKER_TASK_POLLS_TOTAL = "worker_task_polls_total" - WORKER_TASK_POLL_ERRORS_TOTAL = "worker_task_poll_errors_total" - WORKER_ACTIVE_TASKS = "worker_active_tasks" - - # Client metrics - CLIENT_REQUESTS_TOTAL = "client_requests_total" - CLIENT_REQUEST_DURATION_SECONDS = "client_request_duration_seconds" - CLIENT_REQUEST_ERRORS_TOTAL = "client_request_errors_total" diff --git a/tests/cadence/_internal/visibility/test_metrics.py b/tests/cadence/metrics/test_metrics.py similarity index 96% rename from tests/cadence/_internal/visibility/test_metrics.py rename to tests/cadence/metrics/test_metrics.py index 2924969..fdf4bd6 100644 --- a/tests/cadence/_internal/visibility/test_metrics.py +++ b/tests/cadence/metrics/test_metrics.py @@ -3,7 +3,7 @@ from unittest.mock import Mock -from cadence._internal.visibility.metrics import ( +from cadence.metrics import ( MetricsEmitter, MetricType, NoOpMetricsEmitter, diff --git a/tests/cadence/_internal/visibility/test_prometheus.py b/tests/cadence/metrics/test_prometheus.py similarity index 67% rename from tests/cadence/_internal/visibility/test_prometheus.py rename to tests/cadence/metrics/test_prometheus.py index f16bd3e..ad561e1 100644 --- a/tests/cadence/_internal/visibility/test_prometheus.py +++ b/tests/cadence/metrics/test_prometheus.py @@ -2,10 +2,9 @@ from unittest.mock import Mock, patch -from cadence._internal.visibility.prometheus import ( +from cadence.metrics import ( PrometheusMetrics, PrometheusConfig, - CadenceMetrics, ) @@ -15,7 +14,6 @@ class TestPrometheusConfig: def test_default_config(self): """Test default configuration values.""" config = PrometheusConfig() - assert config.metric_prefix == "cadence_" assert config.default_labels == {} assert config.registry is None @@ -25,11 +23,9 @@ def test_custom_config(self): registry = CollectorRegistry() config = PrometheusConfig( - metric_prefix="my_", default_labels={"env": "test"}, registry=registry ) - assert config.metric_prefix == "my_" assert config.default_labels == {"env": "test"} assert config.registry is registry @@ -40,7 +36,6 @@ class TestPrometheusMetrics: def test_init_with_default_config(self): """Test initialization with default config.""" metrics = PrometheusMetrics() - assert metrics.config.metric_prefix == "cadence_" assert metrics.registry is not None def test_init_with_custom_config(self): @@ -49,15 +44,13 @@ def test_init_with_custom_config(self): registry = CollectorRegistry() config = PrometheusConfig( - metric_prefix="custom_", default_labels={"service": "test"}, registry=registry ) metrics = PrometheusMetrics(config) - assert metrics.config.metric_prefix == "custom_" assert metrics.registry is registry - @patch('cadence._internal.visibility.prometheus.Counter') + @patch('cadence.metrics.prometheus.Counter') def test_counter_metric(self, mock_counter_class): """Test counter metric creation and usage.""" mock_counter = Mock() @@ -71,7 +64,7 @@ def test_counter_metric(self, mock_counter_class): mock_counter.labels.assert_called_once_with(label="value") mock_counter.labels.return_value.inc.assert_called_once_with(5) - @patch('cadence._internal.visibility.prometheus.Gauge') + @patch('cadence.metrics.prometheus.Gauge') def test_gauge_metric(self, mock_gauge_class): """Test gauge metric creation and usage.""" mock_gauge = Mock() @@ -85,7 +78,7 @@ def test_gauge_metric(self, mock_gauge_class): mock_gauge.labels.assert_called_once_with(env="prod") mock_gauge.labels.return_value.set.assert_called_once_with(42.5) - @patch('cadence._internal.visibility.prometheus.Histogram') + @patch('cadence.metrics.prometheus.Histogram') def test_histogram_metric(self, mock_histogram_class): """Test histogram metric creation and usage.""" mock_histogram = Mock() @@ -101,12 +94,11 @@ def test_histogram_metric(self, mock_histogram_class): def test_metric_name_generation(self): - """Test metric name generation with prefix.""" - config = PrometheusConfig(metric_prefix="my_app_") - metrics = PrometheusMetrics(config) + """Test metric name generation.""" + metrics = PrometheusMetrics() metric_name = metrics._get_metric_name("test_metric") - assert metric_name == "my_app_test_metric" + assert metric_name == "test_metric" def test_label_merging(self): """Test label merging with default labels.""" @@ -124,7 +116,7 @@ def test_label_merging(self): merged_none = metrics._merge_labels(None) assert merged_none == {"service": "cadence", "version": "1.0"} - @patch('cadence._internal.visibility.prometheus.generate_latest') + @patch('cadence.metrics.prometheus.generate_latest') def test_get_metrics_text(self, mock_generate_latest): """Test getting metrics in text format.""" mock_generate_latest.return_value = b"# HELP test_metric Test metric\n# TYPE test_metric counter\ntest_metric 1.0\n" @@ -153,32 +145,3 @@ def test_error_handling_in_gauge(self): metrics.gauge("test_gauge", 1.0) # Should not raise, just log error - -class TestCadenceMetrics: - """Test cases for CadenceMetrics enum.""" - - def test_workflow_metrics(self): - """Test workflow metric names.""" - assert CadenceMetrics.WORKFLOW_STARTED_TOTAL.value == "workflow_started_total" - assert CadenceMetrics.WORKFLOW_COMPLETED_TOTAL.value == "workflow_completed_total" - assert CadenceMetrics.WORKFLOW_FAILED_TOTAL.value == "workflow_failed_total" - assert CadenceMetrics.WORKFLOW_DURATION_SECONDS.value == "workflow_duration_seconds" - - def test_activity_metrics(self): - """Test activity metric names.""" - assert CadenceMetrics.ACTIVITY_STARTED_TOTAL.value == "activity_started_total" - assert CadenceMetrics.ACTIVITY_COMPLETED_TOTAL.value == "activity_completed_total" - assert CadenceMetrics.ACTIVITY_FAILED_TOTAL.value == "activity_failed_total" - assert CadenceMetrics.ACTIVITY_DURATION_SECONDS.value == "activity_duration_seconds" - - def test_worker_metrics(self): - """Test worker metric names.""" - assert CadenceMetrics.WORKER_TASK_POLLS_TOTAL.value == "worker_task_polls_total" - assert CadenceMetrics.WORKER_TASK_POLL_ERRORS_TOTAL.value == "worker_task_poll_errors_total" - assert CadenceMetrics.WORKER_ACTIVE_TASKS.value == "worker_active_tasks" - - def test_client_metrics(self): - """Test client metric names.""" - assert CadenceMetrics.CLIENT_REQUESTS_TOTAL.value == "client_requests_total" - assert CadenceMetrics.CLIENT_REQUEST_DURATION_SECONDS.value == "client_request_duration_seconds" - assert CadenceMetrics.CLIENT_REQUEST_ERRORS_TOTAL.value == "client_request_errors_total" \ No newline at end of file