Skip to content

Commit cd26a71

Browse files
committed
[Monitor] Generate exporter with TypeSpec
Signed-off-by: Paul Van Eck <paulvaneck@microsoft.com>
1 parent b8b89a3 commit cd26a71

File tree

112 files changed

+7701
-5614
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

112 files changed

+7701
-5614
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/__init__.py

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,19 @@
2424
@dataclass
2525
class _ConfigurationState:
2626
"""Immutable state object for configuration data."""
27+
2728
etag: str = ""
2829
refresh_interval: int = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
2930
version_cache: int = -1
3031
settings_cache: Dict[str, str] = field(default_factory=dict)
3132

32-
def with_updates(self, **kwargs) -> '_ConfigurationState': # pylint: disable=C4741,C4742
33+
def with_updates(self, **kwargs) -> "_ConfigurationState": # pylint: disable=C4741,C4742
3334
"""Create a new state object with updated values."""
3435
return _ConfigurationState(
35-
etag=kwargs.get('etag', self.etag),
36-
refresh_interval=kwargs.get('refresh_interval', self.refresh_interval),
37-
version_cache=kwargs.get('version_cache', self.version_cache),
38-
settings_cache=kwargs.get('settings_cache', self.settings_cache.copy())
36+
etag=kwargs.get("etag", self.etag),
37+
refresh_interval=kwargs.get("refresh_interval", self.refresh_interval),
38+
version_cache=kwargs.get("version_cache", self.version_cache),
39+
settings_cache=kwargs.get("settings_cache", self.settings_cache.copy()),
3940
)
4041

4142

@@ -96,16 +97,16 @@ def _is_transient_error(self, response: OneSettingsResponse) -> bool:
9697
# pylint: disable=too-many-statements, too-many-branches
9798
def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str, str]] = None) -> int:
9899
"""Fetch configuration from OneSettings and update local cache atomically.
99-
100+
100101
This method performs a conditional HTTP request to OneSettings using the
101102
current ETag for efficient caching. It atomically updates the local configuration
102103
state with any new settings and manages version tracking for change detection.
103-
104+
104105
When transient errors are encountered (timeouts, network exceptions, or HTTP status
105-
codes 429, 500-504) from the CHANGE endpoint, the method doubles the current refresh
106-
interval to reduce load on the failing service and returns immediately. The refresh
106+
codes 429, 500-504) from the CHANGE endpoint, the method doubles the current refresh
107+
interval to reduce load on the failing service and returns immediately. The refresh
107108
interval is capped at 24 hours (86,400 seconds) to prevent excessively long delays.
108-
109+
109110
The method implements a check-and-set pattern for thread safety:
110111
1. Reads current state atomically to prepare request headers
111112
2. Makes HTTP request to OneSettings CHANGE endpoint outside locks
@@ -114,12 +115,12 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
114115
4. Re-reads current state to make version comparison decisions
115116
5. Conditionally fetches from CONFIG endpoint if version increased
116117
6. Updates all state fields atomically in a single operation
117-
118+
118119
Version comparison logic:
119120
- Version increase: New configuration available, fetches and caches new settings
120121
- Version same: No changes detected, ETag and refresh interval updated safely
121122
- Version decrease: Unexpected rollback state, logged as warning, no updates applied
122-
123+
123124
Error handling:
124125
- Transient errors (timeouts, exceptions, retryable HTTP codes) from CHANGE endpoint:
125126
Refresh interval doubled (capped), immediate return
@@ -134,31 +135,31 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
134135
135136
:return: Updated refresh interval in seconds for the next configuration check.
136137
This value comes from the OneSettings response or is doubled (capped at 24 hours)
137-
if transient errors are encountered from the CHANGE endpoint, determining how
138+
if transient errors are encountered from the CHANGE endpoint, determining how
138139
frequently the background worker should call this method.
139140
:rtype: int
140-
141+
141142
Thread Safety:
142143
This method is thread-safe using atomic state updates. Multiple threads can
143144
call this method concurrently without data corruption. The implementation uses
144145
a single state lock with minimal critical sections to reduce lock contention.
145-
146+
146147
HTTP requests are performed outside locks to prevent blocking other threads
147148
during potentially slow network operations.
148-
149+
149150
Caching Behavior:
150151
The method automatically includes ETag headers for conditional requests to
151152
minimize unnecessary data transfer. If the server responds with 304 Not Modified,
152153
only the refresh interval is updated while preserving existing configuration.
153-
154+
154155
On CONFIG endpoint failures, the ETag is intentionally not updated to ensure
155156
the next request can retry fetching the same configuration version.
156-
157+
157158
State Consistency:
158159
All configuration state (ETag, refresh interval, version, settings) is updated
159160
atomically using immutable state objects. This prevents race conditions where
160161
different threads might observe inconsistent combinations of these values.
161-
162+
162163
Transient Error Handling:
163164
When transient errors are detected from the CHANGE endpoint (including timeouts,
164165
network exceptions, or retryable HTTP status codes), the refresh interval is
@@ -198,9 +199,9 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
198199
# Prepare new state updates
199200
new_state_updates = {}
200201
if response.etag is not None:
201-
new_state_updates['etag'] = response.etag
202+
new_state_updates["etag"] = response.etag
202203
if response.refresh_interval and response.refresh_interval > 0: # type: ignore
203-
new_state_updates['refresh_interval'] = response.refresh_interval # type: ignore
204+
new_state_updates["refresh_interval"] = response.refresh_interval # type: ignore
204205

205206
if response.status_code == 304:
206207
# Not modified: Settings unchanged, but update etag and refresh interval if provided
@@ -228,24 +229,26 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
228229
if config_response.status_code == 200 and config_response.settings:
229230
# Validate that the versions from change and config match
230231
if config_response.version == response.version:
231-
new_state_updates.update({
232-
'version_cache': response.version, # type: ignore
233-
'settings_cache': config_response.settings # type: ignore
234-
})
232+
new_state_updates.update(
233+
{
234+
"version_cache": response.version, # type: ignore
235+
"settings_cache": config_response.settings, # type: ignore
236+
}
237+
)
235238
else:
236-
logger.warning("Version mismatch between change and config responses." \
237-
"No configurations updated.")
239+
logger.warning(
240+
"Version mismatch between change and config responses. No configurations updated."
241+
)
238242
# We do not update etag to allow retry on next call
239-
new_state_updates.pop('etag', None)
243+
new_state_updates.pop("etag", None)
240244
else:
241245
logger.warning("Unexpected response status: %d", config_response.status_code)
242246
# We do not update etag to allow retry on next call
243-
new_state_updates.pop('etag', None)
247+
new_state_updates.pop("etag", None)
244248
else:
245249
# No settings or version provided
246250
logger.warning("No settings or version provided in config response. Config not updated.")
247251

248-
249252
notify_callbacks = False
250253
current_refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
251254
state_for_callbacks = None
@@ -255,7 +258,7 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
255258
latest_state = self._current_state # Always use latest state
256259
self._current_state = latest_state.with_updates(**new_state_updates)
257260
current_refresh_interval = self._current_state.refresh_interval
258-
if 'settings_cache' in new_state_updates:
261+
if "settings_cache" in new_state_updates:
259262
notify_callbacks = True
260263
state_for_callbacks = self._current_state
261264

@@ -270,7 +273,7 @@ def get_settings(self) -> Dict[str, str]: # pylint: disable=C4741,C4742
270273
with self._state_lock:
271274
return self._current_state.settings_cache.copy() # type: ignore
272275

273-
def get_current_version(self) -> int: # type: ignore # pylint: disable=C4741,C4742
276+
def get_current_version(self) -> int: # type: ignore # pylint: disable=C4741,C4742
274277
"""Get current version."""
275278
with self._state_lock:
276279
return self._current_state.version_cache # type: ignore

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_state.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# Global singleton instance for easy access throughout the codebase
1616
_configuration_manager = None
1717

18+
1819
def get_configuration_manager() -> Optional["_ConfigurationManager"]:
1920
"""Get the global Configuration Manager singleton instance.
2021
@@ -30,5 +31,6 @@ def get_configuration_manager() -> Optional["_ConfigurationManager"]:
3031
global _configuration_manager # pylint: disable=global-statement
3132
if _configuration_manager is None:
3233
from azure.monitor.opentelemetry.exporter._configuration import _ConfigurationManager
34+
3335
_configuration_manager = _ConfigurationManager()
3436
return _configuration_manager

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_utils.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Dict, Optional, Any
55
import json
66
import logging
7+
78
# mypy: disable-error-code="import-untyped"
89
import requests
910

@@ -18,6 +19,7 @@
1819

1920
class _ConfigurationProfile:
2021
"""Profile for the current running SDK."""
22+
2123
os: str = ""
2224
rp: str = ""
2325
attach: str = ""
@@ -28,18 +30,18 @@ class _ConfigurationProfile:
2830
@classmethod
2931
def fill(cls, **kwargs) -> None:
3032
"""Update only the class variables that are provided in kwargs and haven't been updated yet."""
31-
if 'os' in kwargs and cls.os == "":
32-
cls.os = kwargs['os']
33-
if 'version' in kwargs and cls.version == "":
34-
cls.version = kwargs['version']
35-
if 'component' in kwargs and cls.component == "":
36-
cls.component = kwargs['component']
37-
if 'rp' in kwargs and cls.rp == "":
38-
cls.rp = kwargs['rp']
39-
if 'attach' in kwargs and cls.attach == "":
40-
cls.attach = kwargs['attach']
41-
if 'region' in kwargs and cls.region == "":
42-
cls.region = kwargs['region']
33+
if "os" in kwargs and cls.os == "":
34+
cls.os = kwargs["os"]
35+
if "version" in kwargs and cls.version == "":
36+
cls.version = kwargs["version"]
37+
if "component" in kwargs and cls.component == "":
38+
cls.component = kwargs["component"]
39+
if "rp" in kwargs and cls.rp == "":
40+
cls.rp = kwargs["rp"]
41+
if "attach" in kwargs and cls.attach == "":
42+
cls.attach = kwargs["attach"]
43+
if "region" in kwargs and cls.region == "":
44+
cls.region = kwargs["region"]
4345

4446

4547
class OneSettingsResponse:
@@ -64,7 +66,7 @@ def __init__(
6466
settings: Optional[Dict[str, str]] = None,
6567
version: Optional[int] = None,
6668
status_code: int = 200,
67-
has_exception: bool = False
69+
has_exception: bool = False,
6870
):
6971
"""Initialize OneSettingsResponse with configuration data.
7072
@@ -86,8 +88,9 @@ def __init__(
8688
self.has_exception = has_exception
8789

8890

89-
def make_onesettings_request(url: str, query_dict: Optional[Dict[str, str]] = None,
90-
headers: Optional[Dict[str, str]] = None) -> OneSettingsResponse:
91+
def make_onesettings_request(
92+
url: str, query_dict: Optional[Dict[str, str]] = None, headers: Optional[Dict[str, str]] = None
93+
) -> OneSettingsResponse:
9194
"""Make an HTTP request to the OneSettings API and parse the response.
9295
9396
This function handles the complete OneSettings request lifecycle including:
@@ -326,6 +329,7 @@ def _matches_override_rule(override_rule: Dict[str, Any]) -> bool:
326329
# All conditions in this rule matched
327330
return True
328331

332+
329333
# pylint:disable=too-many-return-statements
330334
def _matches_condition(condition_key: str, condition_value: Any) -> bool:
331335
"""Check if a specific condition matches the current configuration profile.
@@ -458,14 +462,15 @@ def _parse_version_with_beta(version: str) -> tuple:
458462
:rtype: tuple
459463
"""
460464
# Check if version contains beta suffix
461-
if 'b' in version:
465+
if "b" in version:
462466
# Split on 'b' to separate base version and beta number
463-
base_version, beta_part = version.split('b', 1)
464-
base_parts = [int(x) for x in base_version.split('.')]
467+
base_version, beta_part = version.split("b", 1)
468+
base_parts = [int(x) for x in base_version.split(".")]
465469
beta_number = int(beta_part) if beta_part.isdigit() else 0
466470
return tuple(base_parts + [beta_number])
467471
# Release version - use infinity for beta part so it sorts after beta versions
468-
base_parts = [int(x) for x in version.split('.')]
469-
return tuple(base_parts + [float('inf')])
472+
base_parts = [int(x) for x in version.split(".")]
473+
return tuple(base_parts + [float("inf")])
474+
470475

471476
# cSpell:enable

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_worker.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
logger = logging.getLogger(__name__)
1010

11+
1112
class _ConfigurationWorker:
1213
"""Background worker thread for periodic configuration refresh from OneSettings.
1314
@@ -49,11 +50,7 @@ def __init__(self, configuration_manager, refresh_interval=None) -> None:
4950
self._lock = threading.Lock() # Single lock for all worker state
5051

5152
self._shutdown_event = threading.Event()
52-
self._refresh_thread = threading.Thread(
53-
target=self._get_configuration,
54-
name="ConfigurationWorker",
55-
daemon=True
56-
)
53+
self._refresh_thread = threading.Thread(target=self._get_configuration, name="ConfigurationWorker", daemon=True)
5754
self._refresh_interval = refresh_interval or self._default_refresh_interval
5855
self._shutdown_event.clear()
5956
self._refresh_thread.start()
@@ -138,8 +135,9 @@ def _get_configuration(self) -> None:
138135
while not self._shutdown_event.is_set():
139136
try:
140137
with self._lock:
141-
self._refresh_interval = \
142-
self._configuration_manager.get_configuration_and_refresh_interval(_ONE_SETTINGS_PYTHON_TARGETING)
138+
self._refresh_interval = self._configuration_manager.get_configuration_and_refresh_interval(
139+
_ONE_SETTINGS_PYTHON_TARGETING
140+
)
143141
# Capture interval while we have the lock
144142
interval = self._refresh_interval
145143
except Exception as ex: # pylint: disable=broad-exception-caught

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_connection_string_parser.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,26 +54,21 @@ def _initialize(self) -> None:
5454
# 3. Key from connection string in environment variable
5555
# 4. Key from instrumentation key in environment variable
5656
self.instrumentation_key = (
57-
code_cs.get(INSTRUMENTATION_KEY) or code_ikey or \
58-
env_cs.get(INSTRUMENTATION_KEY) or env_ikey # type: ignore
57+
code_cs.get(INSTRUMENTATION_KEY) or code_ikey or env_cs.get(INSTRUMENTATION_KEY) or env_ikey # type: ignore
5958
)
6059
# The priority of the endpoints is as follows:
6160
# 1. The endpoint explicitly passed in connection string
6261
# 2. The endpoint from the connection string in environment variable
6362
# 3. The default breeze endpoint
6463
self.endpoint = (
65-
code_cs.get(INGESTION_ENDPOINT) or env_cs.get(INGESTION_ENDPOINT) or \
66-
"https://dc.services.visualstudio.com"
64+
code_cs.get(INGESTION_ENDPOINT) or env_cs.get(INGESTION_ENDPOINT) or "https://dc.services.visualstudio.com"
6765
)
6866
self.live_endpoint = (
69-
code_cs.get(LIVE_ENDPOINT) or env_cs.get(LIVE_ENDPOINT) or \
70-
"https://rt.services.visualstudio.com"
67+
code_cs.get(LIVE_ENDPOINT) or env_cs.get(LIVE_ENDPOINT) or "https://rt.services.visualstudio.com"
7168
)
7269
# The AUDIENCE is a url that identifies Azure Monitor in a specific cloud
7370
# (For example: "https://monitor.azure.com/").
74-
self.aad_audience = (
75-
code_cs.get(AAD_AUDIENCE) or env_cs.get(AAD_AUDIENCE) # type: ignore
76-
)
71+
self.aad_audience = code_cs.get(AAD_AUDIENCE) or env_cs.get(AAD_AUDIENCE) # type: ignore
7772

7873
# Extract region information
7974
self.region = self._extract_region() # type: ignore

0 commit comments

Comments
 (0)