Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@
@dataclass
class _ConfigurationState:
"""Immutable state object for configuration data."""

etag: str = ""
refresh_interval: int = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
version_cache: int = -1
settings_cache: Dict[str, str] = field(default_factory=dict)

def with_updates(self, **kwargs) -> '_ConfigurationState': # pylint: disable=C4741,C4742
def with_updates(self, **kwargs) -> "_ConfigurationState": # pylint: disable=C4741,C4742
"""Create a new state object with updated values."""
return _ConfigurationState(
etag=kwargs.get('etag', self.etag),
refresh_interval=kwargs.get('refresh_interval', self.refresh_interval),
version_cache=kwargs.get('version_cache', self.version_cache),
settings_cache=kwargs.get('settings_cache', self.settings_cache.copy())
etag=kwargs.get("etag", self.etag),
refresh_interval=kwargs.get("refresh_interval", self.refresh_interval),
version_cache=kwargs.get("version_cache", self.version_cache),
settings_cache=kwargs.get("settings_cache", self.settings_cache.copy()),
)


Expand Down Expand Up @@ -96,16 +97,16 @@ def _is_transient_error(self, response: OneSettingsResponse) -> bool:
# pylint: disable=too-many-statements, too-many-branches
def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str, str]] = None) -> int:
"""Fetch configuration from OneSettings and update local cache atomically.

This method performs a conditional HTTP request to OneSettings using the
current ETag for efficient caching. It atomically updates the local configuration
state with any new settings and manages version tracking for change detection.

When transient errors are encountered (timeouts, network exceptions, or HTTP status
codes 429, 500-504) from the CHANGE endpoint, the method doubles the current refresh
interval to reduce load on the failing service and returns immediately. The refresh
codes 429, 500-504) from the CHANGE endpoint, the method doubles the current refresh
interval to reduce load on the failing service and returns immediately. The refresh
interval is capped at 24 hours (86,400 seconds) to prevent excessively long delays.

The method implements a check-and-set pattern for thread safety:
1. Reads current state atomically to prepare request headers
2. Makes HTTP request to OneSettings CHANGE endpoint outside locks
Expand All @@ -114,12 +115,12 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
4. Re-reads current state to make version comparison decisions
5. Conditionally fetches from CONFIG endpoint if version increased
6. Updates all state fields atomically in a single operation

Version comparison logic:
- Version increase: New configuration available, fetches and caches new settings
- Version same: No changes detected, ETag and refresh interval updated safely
- Version decrease: Unexpected rollback state, logged as warning, no updates applied

Error handling:
- Transient errors (timeouts, exceptions, retryable HTTP codes) from CHANGE endpoint:
Refresh interval doubled (capped), immediate return
Expand All @@ -134,31 +135,31 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,

:return: Updated refresh interval in seconds for the next configuration check.
This value comes from the OneSettings response or is doubled (capped at 24 hours)
if transient errors are encountered from the CHANGE endpoint, determining how
if transient errors are encountered from the CHANGE endpoint, determining how
frequently the background worker should call this method.
:rtype: int

Thread Safety:
This method is thread-safe using atomic state updates. Multiple threads can
call this method concurrently without data corruption. The implementation uses
a single state lock with minimal critical sections to reduce lock contention.

HTTP requests are performed outside locks to prevent blocking other threads
during potentially slow network operations.

Caching Behavior:
The method automatically includes ETag headers for conditional requests to
minimize unnecessary data transfer. If the server responds with 304 Not Modified,
only the refresh interval is updated while preserving existing configuration.

On CONFIG endpoint failures, the ETag is intentionally not updated to ensure
the next request can retry fetching the same configuration version.

State Consistency:
All configuration state (ETag, refresh interval, version, settings) is updated
atomically using immutable state objects. This prevents race conditions where
different threads might observe inconsistent combinations of these values.

Transient Error Handling:
When transient errors are detected from the CHANGE endpoint (including timeouts,
network exceptions, or retryable HTTP status codes), the refresh interval is
Expand Down Expand Up @@ -198,9 +199,9 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
# Prepare new state updates
new_state_updates = {}
if response.etag is not None:
new_state_updates['etag'] = response.etag
new_state_updates["etag"] = response.etag
if response.refresh_interval and response.refresh_interval > 0: # type: ignore
new_state_updates['refresh_interval'] = response.refresh_interval # type: ignore
new_state_updates["refresh_interval"] = response.refresh_interval # type: ignore

if response.status_code == 304:
# Not modified: Settings unchanged, but update etag and refresh interval if provided
Expand Down Expand Up @@ -228,24 +229,26 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
if config_response.status_code == 200 and config_response.settings:
# Validate that the versions from change and config match
if config_response.version == response.version:
new_state_updates.update({
'version_cache': response.version, # type: ignore
'settings_cache': config_response.settings # type: ignore
})
new_state_updates.update(
{
"version_cache": response.version, # type: ignore
"settings_cache": config_response.settings, # type: ignore
}
)
else:
logger.warning("Version mismatch between change and config responses." \
"No configurations updated.")
logger.warning(
"Version mismatch between change and config responses. No configurations updated."
)
# We do not update etag to allow retry on next call
new_state_updates.pop('etag', None)
new_state_updates.pop("etag", None)
else:
logger.warning("Unexpected response status: %d", config_response.status_code)
# We do not update etag to allow retry on next call
new_state_updates.pop('etag', None)
new_state_updates.pop("etag", None)
else:
# No settings or version provided
logger.warning("No settings or version provided in config response. Config not updated.")


notify_callbacks = False
current_refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
state_for_callbacks = None
Expand All @@ -255,7 +258,7 @@ def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str,
latest_state = self._current_state # Always use latest state
self._current_state = latest_state.with_updates(**new_state_updates)
current_refresh_interval = self._current_state.refresh_interval
if 'settings_cache' in new_state_updates:
if "settings_cache" in new_state_updates:
notify_callbacks = True
state_for_callbacks = self._current_state

Expand All @@ -270,7 +273,7 @@ def get_settings(self) -> Dict[str, str]: # pylint: disable=C4741,C4742
with self._state_lock:
return self._current_state.settings_cache.copy() # type: ignore

def get_current_version(self) -> int: # type: ignore # pylint: disable=C4741,C4742
def get_current_version(self) -> int: # type: ignore # pylint: disable=C4741,C4742
"""Get current version."""
with self._state_lock:
return self._current_state.version_cache # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Global singleton instance for easy access throughout the codebase
_configuration_manager = None


def get_configuration_manager() -> Optional["_ConfigurationManager"]:
"""Get the global Configuration Manager singleton instance.
Expand All @@ -30,5 +31,6 @@ def get_configuration_manager() -> Optional["_ConfigurationManager"]:
global _configuration_manager # pylint: disable=global-statement
if _configuration_manager is None:
from azure.monitor.opentelemetry.exporter._configuration import _ConfigurationManager

_configuration_manager = _ConfigurationManager()
return _configuration_manager
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, Optional, Any
import json
import logging

# mypy: disable-error-code="import-untyped"
import requests

Expand All @@ -18,6 +19,7 @@

class _ConfigurationProfile:
"""Profile for the current running SDK."""

os: str = ""
rp: str = ""
attach: str = ""
Expand All @@ -28,18 +30,18 @@ class _ConfigurationProfile:
@classmethod
def fill(cls, **kwargs) -> None:
"""Update only the class variables that are provided in kwargs and haven't been updated yet."""
if 'os' in kwargs and cls.os == "":
cls.os = kwargs['os']
if 'version' in kwargs and cls.version == "":
cls.version = kwargs['version']
if 'component' in kwargs and cls.component == "":
cls.component = kwargs['component']
if 'rp' in kwargs and cls.rp == "":
cls.rp = kwargs['rp']
if 'attach' in kwargs and cls.attach == "":
cls.attach = kwargs['attach']
if 'region' in kwargs and cls.region == "":
cls.region = kwargs['region']
if "os" in kwargs and cls.os == "":
cls.os = kwargs["os"]
if "version" in kwargs and cls.version == "":
cls.version = kwargs["version"]
if "component" in kwargs and cls.component == "":
cls.component = kwargs["component"]
if "rp" in kwargs and cls.rp == "":
cls.rp = kwargs["rp"]
if "attach" in kwargs and cls.attach == "":
cls.attach = kwargs["attach"]
if "region" in kwargs and cls.region == "":
cls.region = kwargs["region"]


class OneSettingsResponse:
Expand All @@ -64,7 +66,7 @@ def __init__(
settings: Optional[Dict[str, str]] = None,
version: Optional[int] = None,
status_code: int = 200,
has_exception: bool = False
has_exception: bool = False,
):
"""Initialize OneSettingsResponse with configuration data.

Expand All @@ -86,8 +88,9 @@ def __init__(
self.has_exception = has_exception


def make_onesettings_request(url: str, query_dict: Optional[Dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None) -> OneSettingsResponse:
def make_onesettings_request(
url: str, query_dict: Optional[Dict[str, str]] = None, headers: Optional[Dict[str, str]] = None
) -> OneSettingsResponse:
"""Make an HTTP request to the OneSettings API and parse the response.

This function handles the complete OneSettings request lifecycle including:
Expand Down Expand Up @@ -326,6 +329,7 @@ def _matches_override_rule(override_rule: Dict[str, Any]) -> bool:
# All conditions in this rule matched
return True


# pylint:disable=too-many-return-statements
def _matches_condition(condition_key: str, condition_value: Any) -> bool:
"""Check if a specific condition matches the current configuration profile.
Expand Down Expand Up @@ -458,14 +462,15 @@ def _parse_version_with_beta(version: str) -> tuple:
:rtype: tuple
"""
# Check if version contains beta suffix
if 'b' in version:
if "b" in version:
# Split on 'b' to separate base version and beta number
base_version, beta_part = version.split('b', 1)
base_parts = [int(x) for x in base_version.split('.')]
base_version, beta_part = version.split("b", 1)
base_parts = [int(x) for x in base_version.split(".")]
beta_number = int(beta_part) if beta_part.isdigit() else 0
return tuple(base_parts + [beta_number])
# Release version - use infinity for beta part so it sorts after beta versions
base_parts = [int(x) for x in version.split('.')]
return tuple(base_parts + [float('inf')])
base_parts = [int(x) for x in version.split(".")]
return tuple(base_parts + [float("inf")])


# cSpell:enable
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

logger = logging.getLogger(__name__)


class _ConfigurationWorker:
"""Background worker thread for periodic configuration refresh from OneSettings.

Expand Down Expand Up @@ -49,11 +50,7 @@ def __init__(self, configuration_manager, refresh_interval=None) -> None:
self._lock = threading.Lock() # Single lock for all worker state

self._shutdown_event = threading.Event()
self._refresh_thread = threading.Thread(
target=self._get_configuration,
name="ConfigurationWorker",
daemon=True
)
self._refresh_thread = threading.Thread(target=self._get_configuration, name="ConfigurationWorker", daemon=True)
self._refresh_interval = refresh_interval or self._default_refresh_interval
self._shutdown_event.clear()
self._refresh_thread.start()
Expand Down Expand Up @@ -138,8 +135,9 @@ def _get_configuration(self) -> None:
while not self._shutdown_event.is_set():
try:
with self._lock:
self._refresh_interval = \
self._configuration_manager.get_configuration_and_refresh_interval(_ONE_SETTINGS_PYTHON_TARGETING)
self._refresh_interval = self._configuration_manager.get_configuration_and_refresh_interval(
_ONE_SETTINGS_PYTHON_TARGETING
)
# Capture interval while we have the lock
interval = self._refresh_interval
except Exception as ex: # pylint: disable=broad-exception-caught
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,21 @@ def _initialize(self) -> None:
# 3. Key from connection string in environment variable
# 4. Key from instrumentation key in environment variable
self.instrumentation_key = (
code_cs.get(INSTRUMENTATION_KEY) or code_ikey or \
env_cs.get(INSTRUMENTATION_KEY) or env_ikey # type: ignore
code_cs.get(INSTRUMENTATION_KEY) or code_ikey or env_cs.get(INSTRUMENTATION_KEY) or env_ikey # type: ignore
)
# The priority of the endpoints is as follows:
# 1. The endpoint explicitly passed in connection string
# 2. The endpoint from the connection string in environment variable
# 3. The default breeze endpoint
self.endpoint = (
code_cs.get(INGESTION_ENDPOINT) or env_cs.get(INGESTION_ENDPOINT) or \
"https://dc.services.visualstudio.com"
code_cs.get(INGESTION_ENDPOINT) or env_cs.get(INGESTION_ENDPOINT) or "https://dc.services.visualstudio.com"
)
self.live_endpoint = (
code_cs.get(LIVE_ENDPOINT) or env_cs.get(LIVE_ENDPOINT) or \
"https://rt.services.visualstudio.com"
code_cs.get(LIVE_ENDPOINT) or env_cs.get(LIVE_ENDPOINT) or "https://rt.services.visualstudio.com"
)
# The AUDIENCE is a url that identifies Azure Monitor in a specific cloud
# (For example: "https://monitor.azure.com/").
self.aad_audience = (
code_cs.get(AAD_AUDIENCE) or env_cs.get(AAD_AUDIENCE) # type: ignore
)
self.aad_audience = code_cs.get(AAD_AUDIENCE) or env_cs.get(AAD_AUDIENCE) # type: ignore

# Extract region information
self.region = self._extract_region() # type: ignore
Expand Down
Loading
Loading