From 89dfc1fe5247782a8c94eab210247b7e02e12e9f Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 5 Nov 2025 12:03:25 -0500 Subject: [PATCH 1/3] chore: Add support for FDv1 polling synchronizer --- ldclient/config.py | 2 - ldclient/impl/datasourcev2/polling.py | 129 ++++++++- ldclient/impl/datasourcev2/streaming.py | 31 +- ldclient/impl/datasystem/config.py | 29 ++ ldclient/impl/datasystem/fdv2.py | 6 +- ldclient/impl/datasystem/store.py | 22 +- ldclient/impl/util.py | 11 +- .../test_polling_payload_parsing.py | 211 ++++++++++++++ .../impl/datasystem/test_fdv2_datasystem.py | 265 ++++++++++++++++++ .../impl/datasystem/test_fdv2_persistence.py | 7 +- 10 files changed, 682 insertions(+), 31 deletions(-) diff --git a/ldclient/config.py b/ldclient/config.py index 7d4a7901..6d690637 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -179,8 +179,6 @@ class DataSystemConfig: data_store: Optional[FeatureStore] = None """The (optional) persistent data store instance.""" - # TODO(fdv2): Implement this synchronizer up and hook it up everywhere. - # TODO(fdv2): Remove this when FDv2 is fully launched fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None """An optional fallback synchronizer that will read from FDv1""" diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index 8f867097..f2262956 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -14,6 +14,7 @@ import urllib3 from ldclient.config import Config +from ldclient.impl.datasource.feature_requester import LATEST_ALL_URI from ldclient.impl.datasystem import BasisResult, SelectorStore, Update from ldclient.impl.datasystem.protocolv2 import ( Basis, @@ -22,6 +23,7 @@ DeleteObject, EventName, IntentCode, + Payload, PutObject, Selector, ServerIntent @@ -43,6 +45,7 @@ DataSourceErrorKind, DataSourceState ) +from ldclient.versioned_data_kind import FEATURES, SEGMENTS POLLING_ENDPOINT = "/sdk/poll" @@ -123,6 +126,15 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: ), ) + fallback = result.exception.headers.get("X-LD-FD-Fallback") == 'true' + if fallback: + yield Update( + state=DataSourceState.OFF, + error=error_info, + revert_to_fdv1=True + ) + break + status_code = result.exception.status if is_http_error_recoverable(status_code): # TODO(fdv2): Add support for environment ID @@ -158,6 +170,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: state=DataSourceState.VALID, change_set=change_set, environment_id=headers.get("X-LD-EnvID"), + revert_to_fdv1=headers.get('X-LD-FD-Fallback') == 'true' ) if self._event.wait(self._poll_interval): @@ -262,7 +275,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult: if response.status >= 400: return _Fail( - f"HTTP error {response}", UnsuccessfulResponseException(response.status) + f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers) ) headers = response.headers @@ -375,3 +388,117 @@ def build(self) -> PollingDataSource: return PollingDataSource( poll_interval=self._config.poll_interval, requester=requester ) + + +# pylint: disable=too-few-public-methods +class Urllib3FDv1PollingRequester: + """ + Urllib3PollingRequesterFDv1 is a Requester that uses urllib3 to make HTTP + requests. + """ + + def __init__(self, config: Config): + self._etag = None + self._http = _http_factory(config).create_pool_manager(1, config.base_uri) + self._config = config + self._poll_uri = config.base_uri + LATEST_ALL_URI + + def fetch(self, selector: Optional[Selector]) -> PollingResult: + """ + Fetches the data for the given selector. + Returns a Result containing a tuple of ChangeSet and any request headers, + or an error if the data could not be retrieved. + """ + query_params = {} + if self._config.payload_filter_key is not None: + query_params["filter"] = self._config.payload_filter_key + + uri = self._poll_uri + if len(query_params) > 0: + filter_query = parse.urlencode(query_params) + uri += f"?{filter_query}" + + hdrs = _headers(self._config) + hdrs["Accept-Encoding"] = "gzip" + + if self._etag is not None: + hdrs["If-None-Match"] = self._etag + + response = self._http.request( + "GET", + uri, + headers=hdrs, + timeout=urllib3.Timeout( + connect=self._config.http.connect_timeout, + read=self._config.http.read_timeout, + ), + retries=1, + ) + + if response.status >= 400: + return _Fail( + f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers) + ) + + headers = response.headers + + if response.status == 304: + return _Success(value=(ChangeSetBuilder.no_changes(), headers)) + + data = json.loads(response.data.decode("UTF-8")) + etag = headers.get("ETag") + + if etag is not None: + self._etag = etag + + log.debug( + "%s response status:[%d] ETag:[%s]", + uri, + response.status, + etag, + ) + + changeset_result = fdv1_polling_payload_to_changeset(data) + if isinstance(changeset_result, _Success): + return _Success(value=(changeset_result.value, headers)) + + return _Fail( + error=changeset_result.error, + exception=changeset_result.exception, + ) + + +# pylint: disable=too-many-branches,too-many-return-statements +def fdv1_polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]: + """ + Converts a fdv1 polling payload into a ChangeSet. + """ + builder = ChangeSetBuilder() + builder.start(IntentCode.TRANSFER_FULL) + selector = Selector.no_selector() + + # FDv1 uses "flags" instead of "features", so we need to map accordingly + kind_mappings = [ + (FEATURES, "flags"), + (SEGMENTS, "segments") + ] + + for kind, fdv1_key in kind_mappings: + kind_data = data.get(fdv1_key) + if kind_data is None: + continue + if not isinstance(kind_data, dict): + return _Fail(error=f"Invalid format: {fdv1_key} is not a dictionary") + + for key in kind_data: + flag_or_segment = kind_data.get(key) + if flag_or_segment is None or not isinstance(flag_or_segment, dict): + return _Fail(error=f"Invalid format: {key} is not a dictionary") + + version = flag_or_segment.get('version') + if version is None: + return _Fail(error=f"Invalid format: {key} does not have a version set") + + builder.add_put(kind, key, version, flag_or_segment) + + return _Success(builder.finish(selector)) diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py index 0f6590dc..5edd0450 100644 --- a/ldclient/impl/datasourcev2/streaming.py +++ b/ldclient/impl/datasourcev2/streaming.py @@ -4,13 +4,12 @@ """ import json -from abc import abstractmethod from time import time -from typing import Callable, Generator, Iterable, Optional, Protocol, Tuple +from typing import Callable, Generator, Optional, Tuple from urllib import parse from ld_eventsource import SSEClient -from ld_eventsource.actions import Action, Event, Fault +from ld_eventsource.actions import Event, Fault, Start from ld_eventsource.config import ( ConnectStrategy, ErrorStrategy, @@ -151,6 +150,15 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: break continue + if isinstance(action, Start) and action.headers is not None: + fallback = action.headers.get('X-LD-FD-Fallback') == 'true' + if fallback: + yield Update( + state=DataSourceState.OFF, + revert_to_fdv1=True + ) + break + if not isinstance(action, Event): continue @@ -188,11 +196,6 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: # if update is not None: # self._record_stream_init(False) - # if self._data_source_update_sink is not None: - # self._data_source_update_sink.update_status( - # DataSourceState.VALID, None - # ) - self._sse.close() def stop(self): @@ -288,6 +291,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: If an update is provided, it should be forward upstream, regardless of whether or not we are going to retry this failure. + + The return should be thought of (update, should_continue) """ if not self._running: return (None, False) # don't retry if we've been deliberately stopped @@ -315,12 +320,18 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: str(error), ) + if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true': + update = Update( + state=DataSourceState.OFF, + error=error_info, + revert_to_fdv1=True + ) + return (update, False) + http_error_message_result = http_error_message( error.status, "stream connection" ) - is_recoverable = is_http_error_recoverable(error.status) - update = Update( state=( DataSourceState.INTERRUPTED diff --git a/ldclient/impl/datasystem/config.py b/ldclient/impl/datasystem/config.py index b179ff9f..d3b34a7a 100644 --- a/ldclient/impl/datasystem/config.py +++ b/ldclient/impl/datasystem/config.py @@ -9,6 +9,7 @@ from ldclient.impl.datasourcev2.polling import ( PollingDataSource, PollingDataSourceBuilder, + Urllib3FDv1PollingRequester, Urllib3PollingRequester ) from ldclient.impl.datasourcev2.streaming import ( @@ -55,6 +56,17 @@ def synchronizers( self._secondary_synchronizer = secondary return self + def fdv1_compatible_synchronizer( + self, + fallback: Builder[Synchronizer] + ) -> "ConfigBuilder": + """ + Configures the SDK with a fallback synchronizer that is compatible with + the Flag Delivery v1 API. + """ + self._fdv1_fallback_synchronizer = fallback + return self + def data_store(self, data_store: FeatureStore, store_mode: DataStoreMode) -> "ConfigBuilder": """ Sets the data store configuration for the data system. @@ -91,6 +103,17 @@ def builder(config: LDConfig) -> PollingDataSource: return builder +def fdv1_fallback_ds_builder() -> Builder[PollingDataSource]: + def builder(config: LDConfig) -> PollingDataSource: + requester = Urllib3FDv1PollingRequester(config) + polling_ds = PollingDataSourceBuilder(config) + polling_ds.requester(requester) + + return polling_ds.build() + + return builder + + def streaming_ds_builder() -> Builder[StreamingDataSource]: def builder(config: LDConfig) -> StreamingDataSource: return StreamingDataSourceBuilder(config).build() @@ -114,10 +137,12 @@ def default() -> ConfigBuilder: polling_builder = polling_ds_builder() streaming_builder = streaming_ds_builder() + fallback = fdv1_fallback_ds_builder() builder = ConfigBuilder() builder.initializers([polling_builder]) builder.synchronizers(streaming_builder, polling_builder) + builder.fdv1_compatible_synchronizer(fallback) return builder @@ -130,9 +155,11 @@ def streaming() -> ConfigBuilder: """ streaming_builder = streaming_ds_builder() + fallback = fdv1_fallback_ds_builder() builder = ConfigBuilder() builder.synchronizers(streaming_builder) + builder.fdv1_compatible_synchronizer(fallback) return builder @@ -145,9 +172,11 @@ def polling() -> ConfigBuilder: """ polling_builder: Builder[Synchronizer] = polling_ds_builder() + fallback = fdv1_fallback_ds_builder() builder = ConfigBuilder() builder.synchronizers(polling_builder) + builder.fdv1_compatible_synchronizer(fallback) return builder diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index 8123237b..580aafb2 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -443,9 +443,13 @@ def _consume_synchronizer_results( # Update status self._data_source_status_provider.update_status(update.state, update.error) + # Check if we should revert to FDv1 immediately + if update.revert_to_fdv1: + return True, True + # Check for OFF state indicating permanent failure if update.state == DataSourceState.OFF: - return True, update.revert_to_fdv1 + return True, False # Check condition periodically current_status = self._data_source_status_provider.status diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index 15bc432b..20aea90e 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -306,13 +306,6 @@ def _set_basis( # Update dependency tracker self._reset_dependency_tracker(collections) - # Send change events if we had listeners - if old_data is not None: - affected_items = self._compute_changed_items_for_full_data_set( - old_data, collections - ) - self._send_change_events(affected_items) - # Update state self._persist = persist self._selector = selector if selector is not None else Selector.no_selector() @@ -324,6 +317,13 @@ def _set_basis( if self._should_persist(): self._persistent_store.init(collections) # type: ignore + # Send change events if we had listeners + if old_data is not None: + affected_items = self._compute_changed_items_for_full_data_set( + old_data, collections + ) + self._send_change_events(affected_items) + def _apply_delta( self, collections: Collections, selector: Optional[Selector], persist: bool ) -> None: @@ -353,10 +353,6 @@ def _apply_delta( affected_items, KindAndKey(kind=kind, key=key) ) - # Send change events - if affected_items: - self._send_change_events(affected_items) - # Update state self._persist = persist self._selector = selector if selector is not None else Selector.no_selector() @@ -368,6 +364,10 @@ def _apply_delta( item = kind_data[i] self._persistent_store.upsert(kind, item) # type: ignore + # Send change events + if affected_items: + self._send_change_events(affected_items) + def _should_persist(self) -> bool: """Returns whether data should be persisted to the persistent store.""" return ( diff --git a/ldclient/impl/util.py b/ldclient/impl/util.py index e60feb9d..81054f4b 100644 --- a/ldclient/impl/util.py +++ b/ldclient/impl/util.py @@ -4,7 +4,7 @@ import time from dataclasses import dataclass from datetime import timedelta -from typing import Any, Generic, Optional, TypeVar, Union +from typing import Any, Dict, Generic, Optional, TypeVar, Union from urllib.parse import urlparse, urlunparse from ldclient.impl.http import _base_headers @@ -117,18 +117,23 @@ def __str__(self, *args, **kwargs): class UnsuccessfulResponseException(Exception): - def __init__(self, status): + def __init__(self, status, headers={}): super(UnsuccessfulResponseException, self).__init__("HTTP error %d" % status) self._status = status + self._headers = headers @property def status(self): return self._status + @property + def headers(self): + return self._headers + def throw_if_unsuccessful_response(resp): if resp.status >= 400: - raise UnsuccessfulResponseException(resp.status) + raise UnsuccessfulResponseException(resp.status, resp.headers) def is_http_error_recoverable(status): diff --git a/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py b/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py index dae87706..f47c6432 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py @@ -2,10 +2,12 @@ from ldclient.impl.datasourcev2.polling import ( IntentCode, + fdv1_polling_payload_to_changeset, polling_payload_to_changeset ) from ldclient.impl.datasystem.protocolv2 import ChangeType, ObjectKind from ldclient.impl.util import _Fail, _Success +from ldclient.versioned_data_kind import FEATURES, SEGMENTS def test_payload_is_missing_events_key(): @@ -151,3 +153,212 @@ def test_fails_if_starts_with_put(): assert ( result.exception.args[0] == "changeset: cannot complete without a server-intent" ) + + +# FDv1 Payload Parsing Tests +def test_fdv1_payload_empty_flags_and_segments(): + """Test that FDv1 payload with empty flags and segments produces empty changeset.""" + data = { + "flags": {}, + "segments": {} + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Success) + + change_set = result.value + assert change_set.intent_code == IntentCode.TRANSFER_FULL + assert len(change_set.changes) == 0 + # FDv1 doesn't use selectors + assert change_set.selector is not None + assert not change_set.selector.is_defined() + + +def test_fdv1_payload_with_single_flag(): + """Test that FDv1 payload with a single flag is parsed correctly.""" + data = { + "flags": { + "test-flag": { + "key": "test-flag", + "version": 1, + "on": True, + "variations": [True, False] + } + }, + "segments": {} + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Success) + + change_set = result.value + assert change_set.intent_code == IntentCode.TRANSFER_FULL + assert len(change_set.changes) == 1 + + change = change_set.changes[0] + assert change.action == ChangeType.PUT + assert change.kind == FEATURES + assert change.key == "test-flag" + assert change.version == 1 + + +def test_fdv1_payload_with_multiple_flags(): + """Test that FDv1 payload with multiple flags is parsed correctly.""" + data = { + "flags": { + "flag-1": {"key": "flag-1", "version": 1, "on": True}, + "flag-2": {"key": "flag-2", "version": 2, "on": False}, + "flag-3": {"key": "flag-3", "version": 3, "on": True} + }, + "segments": {} + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Success) + + change_set = result.value + assert len(change_set.changes) == 3 + + flag_keys = {c.key for c in change_set.changes} + assert flag_keys == {"flag-1", "flag-2", "flag-3"} + + +def test_fdv1_payload_with_single_segment(): + """Test that FDv1 payload with a single segment is parsed correctly.""" + data = { + "flags": {}, + "segments": { + "test-segment": { + "key": "test-segment", + "version": 5, + "included": ["user1", "user2"] + } + } + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Success) + + change_set = result.value + assert len(change_set.changes) == 1 + + change = change_set.changes[0] + assert change.action == ChangeType.PUT + assert change.kind == SEGMENTS + assert change.key == "test-segment" + assert change.version == 5 + + +def test_fdv1_payload_with_flags_and_segments(): + """Test that FDv1 payload with both flags and segments is parsed correctly.""" + data = { + "flags": { + "flag-1": {"key": "flag-1", "version": 1, "on": True}, + "flag-2": {"key": "flag-2", "version": 2, "on": False} + }, + "segments": { + "segment-1": {"key": "segment-1", "version": 10}, + "segment-2": {"key": "segment-2", "version": 20} + } + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Success) + + change_set = result.value + assert len(change_set.changes) == 4 + + flag_changes = [c for c in change_set.changes if c.kind == FEATURES] + segment_changes = [c for c in change_set.changes if c.kind == SEGMENTS] + + assert len(flag_changes) == 2 + assert len(segment_changes) == 2 + + +def test_fdv1_payload_flags_not_dict(): + """Test that FDv1 payload parser fails when flags namespace is not a dict.""" + data = { + "flags": "not a dict" + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Fail) + assert "not a dictionary" in result.error + + +def test_fdv1_payload_segments_not_dict(): + """Test that FDv1 payload parser fails when segments namespace is not a dict.""" + data = { + "flags": {}, + "segments": "not a dict" + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Fail) + assert "not a dictionary" in result.error + + +def test_fdv1_payload_flag_value_not_dict(): + """Test that FDv1 payload parser fails when a flag value is not a dict.""" + data = { + "flags": { + "bad-flag": "not a dict" + } + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Fail) + assert "not a dictionary" in result.error + + +def test_fdv1_payload_flag_missing_version(): + """Test that FDv1 payload parser fails when a flag is missing version.""" + data = { + "flags": { + "no-version-flag": { + "key": "no-version-flag", + "on": True + } + } + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Fail) + assert "does not have a version set" in result.error + + +def test_fdv1_payload_segment_missing_version(): + """Test that FDv1 payload parser fails when a segment is missing version.""" + data = { + "flags": {}, + "segments": { + "no-version-segment": { + "key": "no-version-segment", + "included": [] + } + } + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Fail) + assert "does not have a version set" in result.error + + +def test_fdv1_payload_only_flags_no_segments_key(): + """Test that FDv1 payload works when segments key is missing entirely.""" + data = { + "flags": { + "test-flag": {"key": "test-flag", "version": 1, "on": True} + } + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Success) + + change_set = result.value + assert len(change_set.changes) == 1 + assert change_set.changes[0].key == "test-flag" + + +def test_fdv1_payload_only_segments_no_flags_key(): + """Test that FDv1 payload works when flags key is missing entirely.""" + data = { + "segments": { + "test-segment": {"key": "test-segment", "version": 1} + } + } + result = fdv1_polling_payload_to_changeset(data) + assert isinstance(result, _Success) + + change_set = result.value + assert len(change_set.changes) == 1 + assert change_set.changes[0].key == "test-segment" diff --git a/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py b/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py index 353dfa0a..c1bb6895 100644 --- a/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py +++ b/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py @@ -10,6 +10,7 @@ from ldclient.impl.datasystem.fdv2 import FDv2 from ldclient.integrations.test_datav2 import TestDataV2 from ldclient.interfaces import DataSourceState, DataSourceStatus, FlagChange +from ldclient.versioned_data_kind import FEATURES def test_two_phase_init(): @@ -157,3 +158,267 @@ def listener(status: DataSourceStatus): assert changed.wait(1), "Data system did not shut down in time" assert fdv2.data_source_status_provider.status.state == DataSourceState.OFF + + +def test_fdv2_falls_back_to_fdv1_on_polling_error_with_header(): + """ + Test that FDv2 falls back to FDv1 when polling receives an error response + with the X-LD-FD-Fallback: true header. + """ + # Create a mock primary synchronizer that signals FDv1 fallback + mock_primary: Synchronizer = Mock() + mock_primary.name = "mock-primary" + mock_primary.stop = Mock() + + # Simulate a synchronizer that yields an OFF state with revert_to_fdv1=True + from ldclient.impl.datasystem import Update + mock_primary.sync.return_value = iter([ + Update( + state=DataSourceState.OFF, + revert_to_fdv1=True + ) + ]) + + # Create FDv1 fallback data source with actual data + td_fdv1 = TestDataV2.data_source() + td_fdv1.update(td_fdv1.flag("fdv1-flag").on(True)) + + data_system_config = DataSystemConfig( + initializers=None, + primary_synchronizer=lambda _: mock_primary, + fdv1_fallback_synchronizer=td_fdv1.build_synchronizer, + ) + + changed = Event() + changes: List[FlagChange] = [] + + def listener(flag_change: FlagChange): + changes.append(flag_change) + changed.set() + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.flag_tracker.add_listener(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # Update flag in FDv1 data source to verify it's being used + td_fdv1.update(td_fdv1.flag("fdv1-flag").on(False)) + assert changed.wait(1), "Flag change listener was not called in time" + + # Verify we got flag changes from FDv1 + assert len(changes) > 0 + assert any(c.key == "fdv1-flag" for c in changes) + + +def test_fdv2_falls_back_to_fdv1_on_polling_success_with_header(): + """ + Test that FDv2 falls back to FDv1 when polling receives a successful response + with the X-LD-FD-Fallback: true header. + """ + # Create a mock primary synchronizer that yields valid data but signals fallback + mock_primary: Synchronizer = Mock() + mock_primary.name = "mock-primary" + mock_primary.stop = Mock() + + from ldclient.impl.datasystem import Update + mock_primary.sync.return_value = iter([ + Update( + state=DataSourceState.VALID, + revert_to_fdv1=True + ) + ]) + + # Create FDv1 fallback data source + td_fdv1 = TestDataV2.data_source() + td_fdv1.update(td_fdv1.flag("fdv1-fallback-flag").on(True)) + + data_system_config = DataSystemConfig( + initializers=None, + primary_synchronizer=lambda _: mock_primary, + fdv1_fallback_synchronizer=td_fdv1.build_synchronizer, + ) + + changed = Event() + changes: List[FlagChange] = [] + count = 0 + + def listener(flag_change: FlagChange): + nonlocal count + count += 1 + changes.append(flag_change) + if count >= 2: + changed.set() + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.flag_tracker.add_listener(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # Trigger a flag update in FDv1 + td_fdv1.update(td_fdv1.flag("fdv1-fallback-flag").on(False)) + assert changed.wait(1), "Flag change listener was not called in time" + + # Verify FDv1 is active + assert len(changes) > 0 + assert any(c.key == "fdv1-fallback-flag" for c in changes) + + +def test_fdv2_falls_back_to_fdv1_with_initializer(): + """ + Test that FDv2 falls back to FDv1 even when initialized with data, + and that the FDv1 data replaces the initialized data. + """ + # Initialize with some data + td_initializer = TestDataV2.data_source() + td_initializer.update(td_initializer.flag("initial-flag").on(True)) + + # Create mock primary that signals fallback + mock_primary: Synchronizer = Mock() + mock_primary.name = "mock-primary" + mock_primary.stop = Mock() + + from ldclient.impl.datasystem import Update + mock_primary.sync.return_value = iter([ + Update( + state=DataSourceState.OFF, + revert_to_fdv1=True + ) + ]) + + # Create FDv1 fallback with different data + td_fdv1 = TestDataV2.data_source() + td_fdv1.update(td_fdv1.flag("fdv1-replacement-flag").on(True)) + + data_system_config = DataSystemConfig( + initializers=[td_initializer.build_initializer], + primary_synchronizer=lambda _: mock_primary, + fdv1_fallback_synchronizer=td_fdv1.build_synchronizer, + ) + + changed = Event() + changes: List[FlagChange] = [] + + def listener(flag_change: FlagChange): + changes.append(flag_change) + if len(changes) >= 2: + changed.set() + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.flag_tracker.add_listener(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + assert changed.wait(2), "Expected flag changes for both initial and fdv1 flags" + + # Verify we got changes for both flags + flag_keys = [c.key for c in changes] + assert "initial-flag" in flag_keys + assert "fdv1-replacement-flag" in flag_keys + + +def test_fdv2_no_fallback_without_header(): + """ + Test that FDv2 does NOT fall back to FDv1 when an error occurs + but the fallback header is not present. + """ + # Create mock primary that fails but doesn't signal fallback + mock_primary: Synchronizer = Mock() + mock_primary.name = "mock-primary" + mock_primary.stop = Mock() + + from ldclient.impl.datasystem import Update + mock_primary.sync.return_value = iter([ + Update( + state=DataSourceState.INTERRUPTED, + revert_to_fdv1=False # No fallback + ) + ]) + + # Create mock secondary + mock_secondary: Synchronizer = Mock() + mock_secondary.name = "mock-secondary" + mock_secondary.stop = Mock() + mock_secondary.sync.return_value = iter([ + Update( + state=DataSourceState.VALID, + revert_to_fdv1=False + ) + ]) + + # Create FDv1 fallback (should not be used) + td_fdv1 = TestDataV2.data_source() + td_fdv1.update(td_fdv1.flag("fdv1-should-not-appear").on(True)) + + data_system_config = DataSystemConfig( + initializers=None, + primary_synchronizer=lambda _: mock_primary, + secondary_synchronizer=lambda _: mock_secondary, + fdv1_fallback_synchronizer=td_fdv1.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # Verify secondary was called (fallback to secondary, not FDv1) + # Give it a moment to process + import time + time.sleep(0.2) + + # The primary should have been called, then secondary + mock_primary.sync.assert_called() + mock_secondary.sync.assert_called() + + +def test_fdv2_stays_on_fdv1_after_fallback(): + """ + Test that once FDv2 falls back to FDv1, it stays on FDv1 and doesn't + attempt to recover to FDv2. + """ + # Create mock primary that signals fallback + mock_primary: Synchronizer = Mock() + mock_primary.name = "mock-primary" + mock_primary.stop = Mock() + + from ldclient.impl.datasystem import Update + mock_primary.sync.return_value = iter([ + Update( + state=DataSourceState.OFF, + revert_to_fdv1=True + ) + ]) + + # Create FDv1 fallback + td_fdv1 = TestDataV2.data_source() + td_fdv1.update(td_fdv1.flag("fdv1-flag").on(True)) + + data_system_config = DataSystemConfig( + initializers=None, + primary_synchronizer=lambda _: mock_primary, + fdv1_fallback_synchronizer=td_fdv1.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # Give it time to settle + import time + time.sleep(0.5) + + # Primary should only be called once (not retried after fallback) + assert mock_primary.sync.call_count == 1 + + # Verify FDv1 is serving data + store = fdv2.store + flag = store.get(FEATURES, "fdv1-flag", lambda x: x) + assert flag is not None diff --git a/ldclient/testing/impl/datasystem/test_fdv2_persistence.py b/ldclient/testing/impl/datasystem/test_fdv2_persistence.py index f7898d58..999f4d07 100644 --- a/ldclient/testing/impl/datasystem/test_fdv2_persistence.py +++ b/ldclient/testing/impl/datasystem/test_fdv2_persistence.py @@ -229,12 +229,13 @@ def test_persistent_store_delta_updates_read_write(): # Set up flag change listener to detect the update flag_changed = Event() - change_count = [0] # Use list to allow modification in nested function + change_count = 0 def listener(flag_change: FlagChange): - change_count[0] += 1 + nonlocal change_count + change_count += 1 if ( - change_count[0] == 2 + change_count == 2 ): # First change is from initial sync, second is our update flag_changed.set() From c1f0ca42157441b01ed682f9a38c4d49c352a816 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 5 Nov 2025 13:46:01 -0500 Subject: [PATCH 2/3] Fix incorrect object kind --- ldclient/impl/datasourcev2/polling.py | 6 ++++-- .../impl/datasourcev2/test_polling_payload_parsing.py | 9 ++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index f2262956..a1a67702 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -23,6 +23,7 @@ DeleteObject, EventName, IntentCode, + ObjectKind, Payload, PutObject, Selector, @@ -478,9 +479,10 @@ def fdv1_polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]: selector = Selector.no_selector() # FDv1 uses "flags" instead of "features", so we need to map accordingly + # Map FDv1 JSON keys to ObjectKind enum values kind_mappings = [ - (FEATURES, "flags"), - (SEGMENTS, "segments") + (ObjectKind.FLAG, "flags"), + (ObjectKind.SEGMENT, "segments") ] for kind, fdv1_key in kind_mappings: diff --git a/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py b/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py index f47c6432..2b483e47 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py @@ -7,7 +7,6 @@ ) from ldclient.impl.datasystem.protocolv2 import ChangeType, ObjectKind from ldclient.impl.util import _Fail, _Success -from ldclient.versioned_data_kind import FEATURES, SEGMENTS def test_payload_is_missing_events_key(): @@ -195,7 +194,7 @@ def test_fdv1_payload_with_single_flag(): change = change_set.changes[0] assert change.action == ChangeType.PUT - assert change.kind == FEATURES + assert change.kind == ObjectKind.FLAG assert change.key == "test-flag" assert change.version == 1 @@ -240,7 +239,7 @@ def test_fdv1_payload_with_single_segment(): change = change_set.changes[0] assert change.action == ChangeType.PUT - assert change.kind == SEGMENTS + assert change.kind == ObjectKind.SEGMENT assert change.key == "test-segment" assert change.version == 5 @@ -263,8 +262,8 @@ def test_fdv1_payload_with_flags_and_segments(): change_set = result.value assert len(change_set.changes) == 4 - flag_changes = [c for c in change_set.changes if c.kind == FEATURES] - segment_changes = [c for c in change_set.changes if c.kind == SEGMENTS] + flag_changes = [c for c in change_set.changes if c.kind == ObjectKind.FLAG] + segment_changes = [c for c in change_set.changes if c.kind == ObjectKind.SEGMENT] assert len(flag_changes) == 2 assert len(segment_changes) == 2 From df5c41cccebd7d9560e54b075f7f25e9d1d9cd93 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 7 Nov 2025 14:47:17 -0500 Subject: [PATCH 3/3] Bump eventsource --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 731f7bef..118c3336 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ expiringdict = ">=1.1.4" pyRFC3339 = ">=1.0" semver = ">=2.10.2" urllib3 = ">=1.26.0,<3" -launchdarkly-eventsource = ">=1.4.0,<2.0.0" +launchdarkly-eventsource = ">=1.5.0,<2.0.0" redis = { version = ">=2.10.5", optional = true } python-consul = { version = ">=1.0.1", optional = true }