Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,13 @@ def __start_up(self, start_wait: float):
self._data_system.data_source_status_provider
)
self.__flag_tracker = self._data_system.flag_tracker
self._store: ReadOnlyStore = self._data_system.store

big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments)
self.__big_segment_store_manager = big_segment_store_manager

self._evaluator = Evaluator(
lambda key: _get_store_item(self._store, FEATURES, key),
lambda key: _get_store_item(self._store, SEGMENTS, key),
lambda key: _get_store_item(self._data_system.store, FEATURES, key),
lambda key: _get_store_item(self._data_system.store, SEGMENTS, key),
lambda key: big_segment_store_manager.get_user_membership(key),
log,
)
Expand Down Expand Up @@ -571,7 +570,7 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac
return EvaluationDetail(default, None, error_reason('CLIENT_NOT_READY')), None

if not self.is_initialized():
if self._store.initialized:
if self._data_system.store.initialized:
log.warning("Feature Flag evaluation attempted before client has initialized - using last known values from feature store for feature key: " + key)
else:
log.warning("Feature Flag evaluation attempted before client has initialized! Feature store unavailable - returning default: " + str(default) + " for feature key: " + key)
Expand All @@ -584,7 +583,7 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac
return EvaluationDetail(default, None, error_reason('USER_NOT_SPECIFIED')), None

try:
flag = _get_store_item(self._store, FEATURES, key)
flag = _get_store_item(self._data_system.store, FEATURES, key)
except Exception as e:
log.error("Unexpected error while retrieving feature flag \"%s\": %s" % (key, repr(e)))
log.debug(traceback.format_exc())
Expand Down Expand Up @@ -642,7 +641,7 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState:
return FeatureFlagsState(False)

if not self.is_initialized():
if self._store.initialized:
if self._data_system.store.initialized:
log.warning("all_flags_state() called before client has finished initializing! Using last known values from feature store")
else:
log.warning("all_flags_state() called before client has finished initializing! Feature store unavailable - returning empty state")
Expand All @@ -657,7 +656,7 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState:
with_reasons = kwargs.get('with_reasons', False)
details_only_if_tracked = kwargs.get('details_only_for_tracked_flags', False)
try:
flags_map = self._store.all(FEATURES, lambda x: x)
flags_map = self._data_system.store.all(FEATURES, lambda x: x)
if flags_map is None:
raise ValueError("feature store error")
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
if self._config.payload_filter_key is not None:
query_params["filter"] = self._config.payload_filter_key

if selector is not None:
if selector is not None and selector.is_defined():
query_params["selector"] = selector.state

uri = self._poll_uri
Expand Down
23 changes: 20 additions & 3 deletions ldclient/impl/datasystem/fdv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __update_availability(self, available: bool):
if available:
log.warning("Persistent store is available again")

status = DataStoreStatus(available, False)
status = DataStoreStatus(available, True)
self.__store_update_sink.update_status(status)

if available:
Expand Down Expand Up @@ -185,16 +185,18 @@ def __init__(
self._change_set_listeners = Listeners()
self._data_store_listeners = Listeners()

self._data_store_listeners.add(self._persistent_store_outage_recovery)

# Create the store
self._store = Store(self._flag_change_listeners, self._change_set_listeners)

# Status providers
self._data_source_status_provider = DataSourceStatusProviderImpl(Listeners())
self._data_store_status_provider = DataStoreStatusProviderImpl(None, Listeners())
self._data_store_status_provider = DataStoreStatusProviderImpl(None, self._data_store_listeners)

# Configure persistent store if provided
if self._data_system_config.data_store is not None:
self._data_store_status_provider = DataStoreStatusProviderImpl(self._data_system_config.data_store, Listeners())
self._data_store_status_provider = DataStoreStatusProviderImpl(self._data_system_config.data_store, self._data_store_listeners)
writable = self._data_system_config.data_store_mode == DataStoreMode.READ_WRITE
wrapper = FeatureStoreClientWrapper(self._data_system_config.data_store, self._data_store_status_provider)
self._store.with_persistence(
Expand Down Expand Up @@ -509,6 +511,21 @@ def _recovery_condition(self, status: DataSourceStatus) -> bool:

return interrupted_at_runtime or healthy_for_too_long or cannot_initialize

def _persistent_store_outage_recovery(self, data_store_status: DataStoreStatus):
"""
Monitor the data store status. If the store comes online and
potentially has stale data, we should write our known state to it.
"""
if not data_store_status.available:
return

if not data_store_status.stale:
return

err = self._store.commit()
if err is not None:
log.error("Failed to reinitialize data store", exc_info=err)

@property
def store(self) -> ReadOnlyStore:
"""Get the underlying store for flag evaluation."""
Expand Down
2 changes: 1 addition & 1 deletion ldclient/impl/datasystem/protocolv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def name(self) -> str:
"""Returns the name of the initializer."""
raise NotImplementedError

def sync(self, ss: "SelectorStore") -> "Generator[Update, None, None]":
def sync(self, ss: "SelectorStore") -> Generator["Update", None, None]:
"""
sync should begin the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
Expand Down
9 changes: 8 additions & 1 deletion ldclient/impl/datasystem/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from ldclient.impl.dependency_tracker import DependencyTracker, KindAndKey
from ldclient.impl.listeners import Listeners
from ldclient.impl.model.entity import ModelEntity
from ldclient.impl.rwlock import ReadWriteLock
from ldclient.impl.util import log
from ldclient.interfaces import (
Expand Down Expand Up @@ -451,13 +452,19 @@ def commit(self) -> Optional[Exception]:
Returns:
Exception if commit failed, None otherwise
"""
def __mapping_from_kind(kind: VersionedDataKind) -> Callable[[Dict[str, ModelEntity]], Dict[str, Dict[str, Any]]]:
def __mapping(data: Dict[str, ModelEntity]) -> Dict[str, Dict[str, Any]]:
return {k: kind.encode(v) for k, v in data.items()}

return __mapping

with self._lock:
if self._should_persist():
try:
# Get all data from memory store and write to persistent store
all_data = {}
for kind in [FEATURES, SEGMENTS]:
all_data[kind] = self._memory_store.all(kind, lambda x: x)
all_data[kind] = self._memory_store.all(kind, __mapping_from_kind(kind))
self._persistent_store.init(all_data) # type: ignore
except Exception as e:
return e
Expand Down
242 changes: 242 additions & 0 deletions ldclient/testing/impl/datasystem/test_fdv2_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,3 +537,245 @@ def test_no_persistent_store_status_provider_without_store():
assert set_on_ready.wait(1), "Data system did not become ready in time"

fdv2.stop()


def test_persistent_store_outage_recovery_flushes_on_recovery():
"""Test that in-memory store is flushed to persistent store when it recovers from outage"""
from ldclient.interfaces import DataStoreStatus

persistent_store = StubFeatureStore()

# Create synchronizer with initial data
td_synchronizer = TestDataV2.data_source()
td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True))

data_system_config = DataSystemConfig(
data_store_mode=DataStoreMode.READ_WRITE,
data_store=persistent_store,
initializers=None,
primary_synchronizer=td_synchronizer.build_synchronizer,
)

set_on_ready = Event()
fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config)
fdv2.start(set_on_ready)

assert set_on_ready.wait(1), "Data system did not become ready in time"

# Verify initial data is in the persistent store
snapshot = persistent_store.get_data_snapshot()
assert "feature-flag" in snapshot[FEATURES]
assert snapshot[FEATURES]["feature-flag"]["on"] is True

# Reset tracking to isolate recovery behavior
persistent_store.reset_operation_tracking()

event = Event()
fdv2.flag_tracker.add_listener(lambda _flag_change: event.set())
# Simulate a new flag being added while store is "offline"
# (In reality, the store is still online, but we're testing the recovery mechanism)
td_synchronizer.update(td_synchronizer.flag("new-flag").on(False))

# Block until the flag has propagated through the data store
assert event.wait(1)

# Now simulate the persistent store coming back online with stale data
# by triggering the recovery callback directly
fdv2._persistent_store_outage_recovery(DataStoreStatus(available=True, stale=True))

# Verify that init was called on the persistent store (flushing in-memory data)
assert persistent_store.init_called_count > 0, "Store should have been reinitialized"

# Verify both flags are now in the persistent store
snapshot = persistent_store.get_data_snapshot()
assert "feature-flag" in snapshot[FEATURES]
assert "new-flag" in snapshot[FEATURES]

fdv2.stop()


def test_persistent_store_outage_recovery_no_flush_when_not_stale():
"""Test that recovery does NOT flush when store comes back online without stale data"""
from ldclient.interfaces import DataStoreStatus

persistent_store = StubFeatureStore()

td_synchronizer = TestDataV2.data_source()
td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True))

data_system_config = DataSystemConfig(
data_store_mode=DataStoreMode.READ_WRITE,
data_store=persistent_store,
initializers=None,
primary_synchronizer=td_synchronizer.build_synchronizer,
)

set_on_ready = Event()
fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config)
fdv2.start(set_on_ready)

assert set_on_ready.wait(1), "Data system did not become ready in time"

# Reset tracking
persistent_store.reset_operation_tracking()

# Simulate store coming back online but NOT stale (data is fresh)
fdv2._persistent_store_outage_recovery(DataStoreStatus(available=True, stale=False))

# Verify that init was NOT called (no flush needed)
assert persistent_store.init_called_count == 0, "Store should not be reinitialized when not stale"

fdv2.stop()


def test_persistent_store_outage_recovery_no_flush_when_unavailable():
"""Test that recovery does NOT flush when store is unavailable"""
from ldclient.interfaces import DataStoreStatus

persistent_store = StubFeatureStore()

td_synchronizer = TestDataV2.data_source()
td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True))

data_system_config = DataSystemConfig(
data_store_mode=DataStoreMode.READ_WRITE,
data_store=persistent_store,
initializers=None,
primary_synchronizer=td_synchronizer.build_synchronizer,
)

set_on_ready = Event()
fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config)
fdv2.start(set_on_ready)

assert set_on_ready.wait(1), "Data system did not become ready in time"

# Reset tracking
persistent_store.reset_operation_tracking()

# Simulate store being unavailable (even if marked as stale)
fdv2._persistent_store_outage_recovery(DataStoreStatus(available=False, stale=True))

# Verify that init was NOT called (store is not available)
assert persistent_store.init_called_count == 0, "Store should not be reinitialized when unavailable"

fdv2.stop()


def test_persistent_store_commit_encodes_data_correctly():
"""Test that Store.commit() properly encodes data before writing to persistent store"""
from ldclient.impl.datasystem.protocolv2 import (
Change,
ChangeSet,
ChangeType,
IntentCode,
ObjectKind
)
from ldclient.impl.datasystem.store import Store
from ldclient.impl.listeners import Listeners

persistent_store = StubFeatureStore()
store = Store(Listeners(), Listeners())
store.with_persistence(persistent_store, True, None)

# Create a flag with raw data
flag_data = {
"key": "test-flag",
"version": 1,
"on": True,
"variations": [True, False],
"fallthrough": {"variation": 0},
}

# Apply a changeset to add the flag to the in-memory store
changeset = ChangeSet(
intent_code=IntentCode.TRANSFER_FULL,
changes=[
Change(
action=ChangeType.PUT,
kind=ObjectKind.FLAG,
key="test-flag",
version=1,
object=flag_data,
)
],
selector=None,
)
store.apply(changeset, True)

# Reset tracking
persistent_store.reset_operation_tracking()

# Now commit the in-memory store to the persistent store
err = store.commit()
assert err is None, "Commit should succeed"

# Verify that init was called with properly encoded data
assert persistent_store.init_called_count == 1, "Init should be called once"

# Verify the data in the persistent store is properly encoded
snapshot = persistent_store.get_data_snapshot()
assert "test-flag" in snapshot[FEATURES]

# The data should be in the encoded format (as a dict with all required fields)
flag_in_store = snapshot[FEATURES]["test-flag"]
assert flag_in_store["key"] == "test-flag"
assert flag_in_store["version"] == 1
assert flag_in_store["on"] is True


def test_persistent_store_commit_with_no_persistent_store():
"""Test that Store.commit() safely handles the case where there's no persistent store"""
from ldclient.impl.datasystem.store import Store
from ldclient.impl.listeners import Listeners

# Create store without persistent store
store = Store(Listeners(), Listeners())

# Commit should succeed but do nothing
err = store.commit()
assert err is None, "Commit should succeed even without persistent store"


def test_persistent_store_commit_handles_errors():
"""Test that Store.commit() handles errors from persistent store gracefully"""
from ldclient.impl.datasystem.protocolv2 import (
Change,
ChangeSet,
ChangeType,
IntentCode,
ObjectKind
)
from ldclient.impl.datasystem.store import Store
from ldclient.impl.listeners import Listeners

class FailingFeatureStore(StubFeatureStore):
"""A feature store that always fails on init"""
def init(self, all_data):
raise RuntimeError("Simulated persistent store failure")

persistent_store = FailingFeatureStore()
store = Store(Listeners(), Listeners())
store.with_persistence(persistent_store, True, None)

# Add some data to the in-memory store
changeset = ChangeSet(
intent_code=IntentCode.TRANSFER_FULL,
changes=[
Change(
action=ChangeType.PUT,
kind=ObjectKind.FLAG,
key="test-flag",
version=1,
object={"key": "test-flag", "version": 1, "on": True},
)
],
selector=None,
)
store.apply(changeset, True)

# Commit should return the error without raising
err = store.commit()
assert err is not None, "Commit should return error from persistent store"
assert isinstance(err, RuntimeError)
assert str(err) == "Simulated persistent store failure"