Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ test-all: install
.PHONY: lint
lint: #! Run type analysis and linting checks
lint: install
@mkdir -p .mypy_cache
@poetry run mypy ldclient
@poetry run isort --check --atomic ldclient contract-tests
@poetry run pycodestyle ldclient contract-tests
Expand Down
40 changes: 25 additions & 15 deletions ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from ldclient.impl.http import _http_factory
from ldclient.impl.repeating_task import RepeatingTask
from ldclient.impl.util import (
_LD_ENVID_HEADER,
_LD_FD_FALLBACK_HEADER,
UnsuccessfulResponseException,
_Fail,
_headers,
Expand Down Expand Up @@ -117,6 +119,13 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
while self._stop.is_set() is False:
result = self._requester.fetch(ss.selector())
if isinstance(result, _Fail):
fallback = None
envid = None

if result.headers is not None:
fallback = result.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
envid = result.headers.get(_LD_ENVID_HEADER)

if isinstance(result.exception, UnsuccessfulResponseException):
error_info = DataSourceErrorInfo(
kind=DataSourceErrorKind.ERROR_RESPONSE,
Expand All @@ -127,28 +136,28 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
),
)

fallback = result.exception.headers.get("X-LD-FD-Fallback") == 'true'
if fallback:
yield Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True
revert_to_fdv1=True,
environment_id=envid,
)
break

status_code = result.exception.status
if is_http_error_recoverable(status_code):
# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.INTERRUPTED,
error=error_info,
environment_id=envid,
)
continue

# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.OFF,
error=error_info,
environment_id=envid,
)
break

Expand All @@ -159,19 +168,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
message=result.error,
)

# TODO(fdv2): Go has a designation here to handle JSON decoding separately.
# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.INTERRUPTED,
error=error_info,
environment_id=envid,
)
else:
(change_set, headers) = result.value
yield Update(
state=DataSourceState.VALID,
change_set=change_set,
environment_id=headers.get("X-LD-EnvID"),
revert_to_fdv1=headers.get('X-LD-FD-Fallback') == 'true'
environment_id=headers.get(_LD_ENVID_HEADER),
revert_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
)

if self._event.wait(self._poll_interval):
Expand Down Expand Up @@ -208,7 +216,7 @@ def _poll(self, ss: SelectorStore) -> BasisResult:

(change_set, headers) = result.value

env_id = headers.get("X-LD-EnvID")
env_id = headers.get(_LD_ENVID_HEADER)
if not isinstance(env_id, str):
env_id = None

Expand Down Expand Up @@ -273,14 +281,14 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
),
retries=1,
)
headers = response.headers

if response.status >= 400:
return _Fail(
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
f"HTTP error {response}", UnsuccessfulResponseException(response.status),
headers=headers,
)

headers = response.headers

if response.status == 304:
return _Success(value=(ChangeSetBuilder.no_changes(), headers))

Expand All @@ -304,6 +312,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
return _Fail(
error=changeset_result.error,
exception=changeset_result.exception,
headers=headers, # type: ignore
)


Expand Down Expand Up @@ -436,13 +445,13 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
retries=1,
)

headers = response.headers
if response.status >= 400:
return _Fail(
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
f"HTTP error {response}", UnsuccessfulResponseException(response.status),
headers=headers
)

headers = response.headers

if response.status == 304:
return _Success(value=(ChangeSetBuilder.no_changes(), headers))

Expand All @@ -466,6 +475,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
return _Fail(
error=changeset_result.error,
exception=changeset_result.exception,
headers=headers,
)


Expand Down
45 changes: 28 additions & 17 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
)
from ldclient.impl.http import HTTPFactory, _http_factory
from ldclient.impl.util import (
_LD_ENVID_HEADER,
_LD_FD_FALLBACK_HEADER,
http_error_message,
is_http_error_recoverable,
log
Expand All @@ -58,7 +60,6 @@

STREAMING_ENDPOINT = "/sdk/stream"


SseClientBuilder = Callable[[Config, SelectorStore], SSEClient]


Expand Down Expand Up @@ -146,6 +147,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
self._running = True
self._connection_attempt_start_time = time()

envid = None
for action in self._sse.all:
if isinstance(action, Fault):
# If the SSE client detects the stream has closed, then it will
Expand All @@ -154,7 +156,10 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
if action.error is None:
continue

(update, should_continue) = self._handle_error(action.error)
if action.headers is not None:
envid = action.headers.get(_LD_ENVID_HEADER, envid)

(update, should_continue) = self._handle_error(action.error, envid)
if update is not None:
yield update

Expand All @@ -163,20 +168,23 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
continue

if isinstance(action, Start) and action.headers is not None:
fallback = action.headers.get('X-LD-FD-Fallback') == 'true'
fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
envid = action.headers.get(_LD_ENVID_HEADER, envid)

if fallback:
self._record_stream_init(True)
yield Update(
state=DataSourceState.OFF,
revert_to_fdv1=True
revert_to_fdv1=True,
environment_id=envid,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Start Action Leaves Environment State Stale

When a Start action arrives with headers=None, the envid variable isn't updated, causing it to retain a stale value from a previous Fault action. This means subsequent event updates could incorrectly use an environment_id that was only meant for the fault's error update. The condition if isinstance(action, Start) and action.headers is not None: should update envid regardless of whether headers exist, setting it to None when headers are absent to properly reflect the new connection state.

Fix in Cursor Fix in Web

)
break

if not isinstance(action, Event):
continue

try:
update = self._process_message(action, change_set_builder)
update = self._process_message(action, change_set_builder, envid)
if update is not None:
self._record_stream_init(False)
self._connection_attempt_start_time = None
Expand All @@ -187,7 +195,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
)
self._sse.interrupt()

(update, should_continue) = self._handle_error(e)
(update, should_continue) = self._handle_error(e, envid)
if update is not None:
yield update
if not should_continue:
Expand All @@ -204,7 +212,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
DataSourceErrorKind.UNKNOWN, 0, time(), str(e)
),
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)

self._sse.close()
Expand All @@ -226,7 +234,7 @@ def _record_stream_init(self, failed: bool):

# pylint: disable=too-many-return-statements
def _process_message(
self, msg: Event, change_set_builder: ChangeSetBuilder
self, msg: Event, change_set_builder: ChangeSetBuilder, envid: Optional[str]
) -> Optional[Update]:
"""
Processes a single message from the SSE stream and returns an Update
Expand All @@ -247,7 +255,7 @@ def _process_message(
change_set_builder.expect_changes()
return Update(
state=DataSourceState.VALID,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)
return None

Expand Down Expand Up @@ -293,13 +301,13 @@ def _process_message(
return Update(
state=DataSourceState.VALID,
change_set=change_set,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)

log.info("Unexpected event found in stream: %s", msg.event)
return None

def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optional[Update], bool]:
"""
This method handles errors that occur during the streaming process.

Expand Down Expand Up @@ -328,7 +336,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
DataSourceErrorKind.INVALID_DATA, 0, time(), str(error)
),
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)
return (update, True)

Expand All @@ -344,11 +352,15 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
str(error),
)

if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true':
if envid is None and error.headers is not None:
envid = error.headers.get(_LD_ENVID_HEADER)

if error.headers is not None and error.headers.get(_LD_FD_FALLBACK_HEADER) == 'true':
update = Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True
revert_to_fdv1=True,
environment_id=envid,
)
return (update, False)

Expand All @@ -364,7 +376,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
),
error=error_info,
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)

if not is_recoverable:
Expand All @@ -386,7 +398,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
DataSourceErrorKind.UNKNOWN, 0, time(), str(error)
),
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)
# no stacktrace here because, for a typical connection error, it'll
# just be a lengthy tour of urllib3 internals
Expand All @@ -411,5 +423,4 @@ def __init__(self, config: Config):

def build(self) -> StreamingDataSource:
"""Builds a StreamingDataSource instance with the configured parameters."""
# TODO(fdv2): Add in the other controls here.
return StreamingDataSource(self._config)
15 changes: 0 additions & 15 deletions ldclient/impl/datasystem/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,3 @@ def persistent_store(store: FeatureStore) -> ConfigBuilder:
although it will keep it up-to-date.
"""
return default().data_store(store, DataStoreMode.READ_WRITE)


# TODO(fdv2): Implement these methods
#
# WithEndpoints configures the data system with custom endpoints for
# LaunchDarkly's streaming and polling synchronizers. This method is not
# necessary for most use-cases, but can be useful for testing or custom
# network configurations.
#
# Any endpoint that is not specified (empty string) will be treated as the
# default LaunchDarkly SaaS endpoint for that service.

# WithRelayProxyEndpoints configures the data system with a single endpoint
# for LaunchDarkly's streaming and polling synchronizers. The endpoint
# should be Relay Proxy's base URI, for example http://localhost:8123.
15 changes: 7 additions & 8 deletions ldclient/impl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, Generic, Optional, TypeVar, Union
from typing import Any, Dict, Generic, Mapping, Optional, TypeVar, Union
from urllib.parse import urlparse, urlunparse

from ldclient.impl.http import _base_headers
Expand Down Expand Up @@ -35,6 +35,9 @@ def timedelta_millis(delta: timedelta) -> float:
# Compiled regex pattern for valid characters in application values and SDK keys
_VALID_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9._-]")

_LD_ENVID_HEADER = 'X-LD-EnvID'
_LD_FD_FALLBACK_HEADER = 'X-LD-FD-Fallback'


def validate_application_info(application: dict, logger: logging.Logger) -> dict:
return {
Expand Down Expand Up @@ -117,23 +120,18 @@ def __str__(self, *args, **kwargs):


class UnsuccessfulResponseException(Exception):
def __init__(self, status, headers={}):
def __init__(self, status):
super(UnsuccessfulResponseException, self).__init__("HTTP error %d" % status)
self._status = status
self._headers = headers

@property
def status(self):
return self._status

@property
def headers(self):
return self._headers


def throw_if_unsuccessful_response(resp):
if resp.status >= 400:
raise UnsuccessfulResponseException(resp.status, resp.headers)
raise UnsuccessfulResponseException(resp.status)


def is_http_error_recoverable(status):
Expand Down Expand Up @@ -290,6 +288,7 @@ class _Success(Generic[T]):
class _Fail(Generic[E]):
error: E
exception: Optional[Exception] = None
headers: Optional[Mapping[str, Any]] = None


# TODO(breaking): Replace the above Result class with an improved generic
Expand Down
Loading