diff --git a/ldclient/client.py b/ldclient/client.py index 3cd3b9be..7022f137 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -273,14 +273,13 @@ def __start_up(self, start_wait: float): self._data_system.data_source_status_provider ) self.__flag_tracker = self._data_system.flag_tracker - self._store: ReadOnlyStore = self._data_system.store big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments) self.__big_segment_store_manager = big_segment_store_manager self._evaluator = Evaluator( - lambda key: _get_store_item(self._store, FEATURES, key), - lambda key: _get_store_item(self._store, SEGMENTS, key), + lambda key: _get_store_item(self._data_system.store, FEATURES, key), + lambda key: _get_store_item(self._data_system.store, SEGMENTS, key), lambda key: big_segment_store_manager.get_user_membership(key), log, ) @@ -571,7 +570,7 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac return EvaluationDetail(default, None, error_reason('CLIENT_NOT_READY')), None if not self.is_initialized(): - if self._store.initialized: + if self._data_system.store.initialized: log.warning("Feature Flag evaluation attempted before client has initialized - using last known values from feature store for feature key: " + key) else: log.warning("Feature Flag evaluation attempted before client has initialized! Feature store unavailable - returning default: " + str(default) + " for feature key: " + key) @@ -584,7 +583,7 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac return EvaluationDetail(default, None, error_reason('USER_NOT_SPECIFIED')), None try: - flag = _get_store_item(self._store, FEATURES, key) + flag = _get_store_item(self._data_system.store, FEATURES, key) except Exception as e: log.error("Unexpected error while retrieving feature flag \"%s\": %s" % (key, repr(e))) log.debug(traceback.format_exc()) @@ -642,7 +641,7 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState: return FeatureFlagsState(False) if not self.is_initialized(): - if self._store.initialized: + if self._data_system.store.initialized: log.warning("all_flags_state() called before client has finished initializing! Using last known values from feature store") else: log.warning("all_flags_state() called before client has finished initializing! Feature store unavailable - returning empty state") @@ -657,7 +656,7 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState: with_reasons = kwargs.get('with_reasons', False) details_only_if_tracked = kwargs.get('details_only_for_tracked_flags', False) try: - flags_map = self._store.all(FEATURES, lambda x: x) + flags_map = self._data_system.store.all(FEATURES, lambda x: x) if flags_map is None: raise ValueError("feature store error") except Exception as e: diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index e5415039..4df2c32e 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -257,7 +257,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult: if self._config.payload_filter_key is not None: query_params["filter"] = self._config.payload_filter_key - if selector is not None: + if selector is not None and selector.is_defined(): query_params["selector"] = selector.state uri = self._poll_uri diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index 41df248b..91b5494e 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -89,7 +89,7 @@ def __update_availability(self, available: bool): if available: log.warning("Persistent store is available again") - status = DataStoreStatus(available, False) + status = DataStoreStatus(available, True) self.__store_update_sink.update_status(status) if available: @@ -185,16 +185,18 @@ def __init__( self._change_set_listeners = Listeners() self._data_store_listeners = Listeners() + self._data_store_listeners.add(self._persistent_store_outage_recovery) + # Create the store self._store = Store(self._flag_change_listeners, self._change_set_listeners) # Status providers self._data_source_status_provider = DataSourceStatusProviderImpl(Listeners()) - self._data_store_status_provider = DataStoreStatusProviderImpl(None, Listeners()) + self._data_store_status_provider = DataStoreStatusProviderImpl(None, self._data_store_listeners) # Configure persistent store if provided if self._data_system_config.data_store is not None: - self._data_store_status_provider = DataStoreStatusProviderImpl(self._data_system_config.data_store, Listeners()) + self._data_store_status_provider = DataStoreStatusProviderImpl(self._data_system_config.data_store, self._data_store_listeners) writable = self._data_system_config.data_store_mode == DataStoreMode.READ_WRITE wrapper = FeatureStoreClientWrapper(self._data_system_config.data_store, self._data_store_status_provider) self._store.with_persistence( @@ -509,6 +511,21 @@ def _recovery_condition(self, status: DataSourceStatus) -> bool: return interrupted_at_runtime or healthy_for_too_long or cannot_initialize + def _persistent_store_outage_recovery(self, data_store_status: DataStoreStatus): + """ + Monitor the data store status. If the store comes online and + potentially has stale data, we should write our known state to it. + """ + if not data_store_status.available: + return + + if not data_store_status.stale: + return + + err = self._store.commit() + if err is not None: + log.error("Failed to reinitialize data store", exc_info=err) + @property def store(self) -> ReadOnlyStore: """Get the underlying store for flag evaluation.""" diff --git a/ldclient/impl/datasystem/protocolv2.py b/ldclient/impl/datasystem/protocolv2.py index e61f019e..c26ad746 100644 --- a/ldclient/impl/datasystem/protocolv2.py +++ b/ldclient/impl/datasystem/protocolv2.py @@ -505,7 +505,7 @@ def name(self) -> str: """Returns the name of the initializer.""" raise NotImplementedError - def sync(self, ss: "SelectorStore") -> "Generator[Update, None, None]": + def sync(self, ss: "SelectorStore") -> Generator["Update", None, None]: """ sync should begin the synchronization process for the data source, yielding Update objects until the connection is closed or an unrecoverable error diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index 20aea90e..49f0a70a 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -20,6 +20,7 @@ ) from ldclient.impl.dependency_tracker import DependencyTracker, KindAndKey from ldclient.impl.listeners import Listeners +from ldclient.impl.model.entity import ModelEntity from ldclient.impl.rwlock import ReadWriteLock from ldclient.impl.util import log from ldclient.interfaces import ( @@ -451,13 +452,19 @@ def commit(self) -> Optional[Exception]: Returns: Exception if commit failed, None otherwise """ + def __mapping_from_kind(kind: VersionedDataKind) -> Callable[[Dict[str, ModelEntity]], Dict[str, Dict[str, Any]]]: + def __mapping(data: Dict[str, ModelEntity]) -> Dict[str, Dict[str, Any]]: + return {k: kind.encode(v) for k, v in data.items()} + + return __mapping + with self._lock: if self._should_persist(): try: # Get all data from memory store and write to persistent store all_data = {} for kind in [FEATURES, SEGMENTS]: - all_data[kind] = self._memory_store.all(kind, lambda x: x) + all_data[kind] = self._memory_store.all(kind, __mapping_from_kind(kind)) self._persistent_store.init(all_data) # type: ignore except Exception as e: return e diff --git a/ldclient/testing/impl/datasystem/test_fdv2_persistence.py b/ldclient/testing/impl/datasystem/test_fdv2_persistence.py index 999f4d07..7f77da17 100644 --- a/ldclient/testing/impl/datasystem/test_fdv2_persistence.py +++ b/ldclient/testing/impl/datasystem/test_fdv2_persistence.py @@ -537,3 +537,245 @@ def test_no_persistent_store_status_provider_without_store(): assert set_on_ready.wait(1), "Data system did not become ready in time" fdv2.stop() + + +def test_persistent_store_outage_recovery_flushes_on_recovery(): + """Test that in-memory store is flushed to persistent store when it recovers from outage""" + from ldclient.interfaces import DataStoreStatus + + persistent_store = StubFeatureStore() + + # Create synchronizer with initial data + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + + data_system_config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # Verify initial data is in the persistent store + snapshot = persistent_store.get_data_snapshot() + assert "feature-flag" in snapshot[FEATURES] + assert snapshot[FEATURES]["feature-flag"]["on"] is True + + # Reset tracking to isolate recovery behavior + persistent_store.reset_operation_tracking() + + event = Event() + fdv2.flag_tracker.add_listener(lambda _flag_change: event.set()) + # Simulate a new flag being added while store is "offline" + # (In reality, the store is still online, but we're testing the recovery mechanism) + td_synchronizer.update(td_synchronizer.flag("new-flag").on(False)) + + # Block until the flag has propagated through the data store + assert event.wait(1) + + # Now simulate the persistent store coming back online with stale data + # by triggering the recovery callback directly + fdv2._persistent_store_outage_recovery(DataStoreStatus(available=True, stale=True)) + + # Verify that init was called on the persistent store (flushing in-memory data) + assert persistent_store.init_called_count > 0, "Store should have been reinitialized" + + # Verify both flags are now in the persistent store + snapshot = persistent_store.get_data_snapshot() + assert "feature-flag" in snapshot[FEATURES] + assert "new-flag" in snapshot[FEATURES] + + fdv2.stop() + + +def test_persistent_store_outage_recovery_no_flush_when_not_stale(): + """Test that recovery does NOT flush when store comes back online without stale data""" + from ldclient.interfaces import DataStoreStatus + + persistent_store = StubFeatureStore() + + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + + data_system_config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # Reset tracking + persistent_store.reset_operation_tracking() + + # Simulate store coming back online but NOT stale (data is fresh) + fdv2._persistent_store_outage_recovery(DataStoreStatus(available=True, stale=False)) + + # Verify that init was NOT called (no flush needed) + assert persistent_store.init_called_count == 0, "Store should not be reinitialized when not stale" + + fdv2.stop() + + +def test_persistent_store_outage_recovery_no_flush_when_unavailable(): + """Test that recovery does NOT flush when store is unavailable""" + from ldclient.interfaces import DataStoreStatus + + persistent_store = StubFeatureStore() + + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + + data_system_config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # Reset tracking + persistent_store.reset_operation_tracking() + + # Simulate store being unavailable (even if marked as stale) + fdv2._persistent_store_outage_recovery(DataStoreStatus(available=False, stale=True)) + + # Verify that init was NOT called (store is not available) + assert persistent_store.init_called_count == 0, "Store should not be reinitialized when unavailable" + + fdv2.stop() + + +def test_persistent_store_commit_encodes_data_correctly(): + """Test that Store.commit() properly encodes data before writing to persistent store""" + from ldclient.impl.datasystem.protocolv2 import ( + Change, + ChangeSet, + ChangeType, + IntentCode, + ObjectKind + ) + from ldclient.impl.datasystem.store import Store + from ldclient.impl.listeners import Listeners + + persistent_store = StubFeatureStore() + store = Store(Listeners(), Listeners()) + store.with_persistence(persistent_store, True, None) + + # Create a flag with raw data + flag_data = { + "key": "test-flag", + "version": 1, + "on": True, + "variations": [True, False], + "fallthrough": {"variation": 0}, + } + + # Apply a changeset to add the flag to the in-memory store + changeset = ChangeSet( + intent_code=IntentCode.TRANSFER_FULL, + changes=[ + Change( + action=ChangeType.PUT, + kind=ObjectKind.FLAG, + key="test-flag", + version=1, + object=flag_data, + ) + ], + selector=None, + ) + store.apply(changeset, True) + + # Reset tracking + persistent_store.reset_operation_tracking() + + # Now commit the in-memory store to the persistent store + err = store.commit() + assert err is None, "Commit should succeed" + + # Verify that init was called with properly encoded data + assert persistent_store.init_called_count == 1, "Init should be called once" + + # Verify the data in the persistent store is properly encoded + snapshot = persistent_store.get_data_snapshot() + assert "test-flag" in snapshot[FEATURES] + + # The data should be in the encoded format (as a dict with all required fields) + flag_in_store = snapshot[FEATURES]["test-flag"] + assert flag_in_store["key"] == "test-flag" + assert flag_in_store["version"] == 1 + assert flag_in_store["on"] is True + + +def test_persistent_store_commit_with_no_persistent_store(): + """Test that Store.commit() safely handles the case where there's no persistent store""" + from ldclient.impl.datasystem.store import Store + from ldclient.impl.listeners import Listeners + + # Create store without persistent store + store = Store(Listeners(), Listeners()) + + # Commit should succeed but do nothing + err = store.commit() + assert err is None, "Commit should succeed even without persistent store" + + +def test_persistent_store_commit_handles_errors(): + """Test that Store.commit() handles errors from persistent store gracefully""" + from ldclient.impl.datasystem.protocolv2 import ( + Change, + ChangeSet, + ChangeType, + IntentCode, + ObjectKind + ) + from ldclient.impl.datasystem.store import Store + from ldclient.impl.listeners import Listeners + + class FailingFeatureStore(StubFeatureStore): + """A feature store that always fails on init""" + def init(self, all_data): + raise RuntimeError("Simulated persistent store failure") + + persistent_store = FailingFeatureStore() + store = Store(Listeners(), Listeners()) + store.with_persistence(persistent_store, True, None) + + # Add some data to the in-memory store + changeset = ChangeSet( + intent_code=IntentCode.TRANSFER_FULL, + changes=[ + Change( + action=ChangeType.PUT, + kind=ObjectKind.FLAG, + key="test-flag", + version=1, + object={"key": "test-flag", "version": 1, "on": True}, + ) + ], + selector=None, + ) + store.apply(changeset, True) + + # Commit should return the error without raising + err = store.commit() + assert err is not None, "Commit should return error from persistent store" + assert isinstance(err, RuntimeError) + assert str(err) == "Simulated persistent store failure"