diff --git a/ldclient/client.py b/ldclient/client.py index 091b064f..6c3269ad 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -30,6 +30,8 @@ DataStoreStatusProviderImpl, DataStoreUpdateSinkImpl ) +from ldclient.impl.datasystem import DataAvailability, DataSystem +from ldclient.impl.datasystem.fdv2 import FDv2 from ldclient.impl.evaluator import Evaluator, error_reason from ldclient.impl.events.diagnostics import ( _DiagnosticAccumulator, @@ -249,14 +251,19 @@ def __start_up(self, start_wait: float): self.__hooks_lock = ReadWriteLock() self.__hooks = self._config.hooks + plugin_hooks # type: List[Hook] - # Initialize data system (FDv1) to encapsulate v1 data plumbing - from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency - FDv1 - ) + datasystem_config = self._config.datasystem_config + if datasystem_config is None: + # Initialize data system (FDv1) to encapsulate v1 data plumbing + from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency + FDv1 + ) + + self._data_system: DataSystem = FDv1(self._config) + else: + self._data_system = FDv2(datasystem_config, disabled=self._config.offline) - self._data_system = FDv1(self._config) # Provide flag evaluation function for value-change tracking - self._data_system.set_flag_value_eval_fn( + self._data_system.set_flag_value_eval_fn( # type: ignore lambda key, context: self.variation(key, context, None) ) # Expose providers and store from data system @@ -265,7 +272,7 @@ def __start_up(self, start_wait: float): self._data_system.data_source_status_provider ) self.__flag_tracker = self._data_system.flag_tracker - self._store = self._data_system.store # type: FeatureStore + self._store: FeatureStore = self._data_system.store # type: ignore big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments) self.__big_segment_store_manager = big_segment_store_manager @@ -286,7 +293,7 @@ def __start_up(self, start_wait: float): diagnostic_accumulator = self._set_event_processor(self._config) # Pass diagnostic accumulator to data system for streaming metrics - self._data_system.set_diagnostic_accumulator(diagnostic_accumulator) + self._data_system.set_diagnostic_accumulator(diagnostic_accumulator) # type: ignore self.__register_plugins(environment_metadata) @@ -475,11 +482,7 @@ def is_initialized(self) -> bool: if self.is_offline() or self._config.use_ldd: return True - return ( - self._data_system._update_processor.initialized() - if self._data_system._update_processor - else False - ) + return self._data_system.data_availability.at_least(DataAvailability.CACHED) def flush(self): """Flushes all pending analytics events. diff --git a/ldclient/config.py b/ldclient/config.py index fbc88ac8..af5e62b7 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -4,11 +4,13 @@ Note that the same class can also be imported from the ``ldclient.client`` submodule. """ +from dataclasses import dataclass from threading import Event -from typing import Callable, List, Optional, Set +from typing import Callable, List, Optional, Set, TypeVar from ldclient.feature_store import InMemoryFeatureStore from ldclient.hook import Hook +from ldclient.impl.datasystem import Initializer, Synchronizer from ldclient.impl.util import ( log, validate_application_info, @@ -152,6 +154,32 @@ def disable_ssl_verification(self) -> bool: return self.__disable_ssl_verification +T = TypeVar("T") + +Builder = Callable[[], T] + + +@dataclass(frozen=True) +class DataSystemConfig: + """ + Configuration for LaunchDarkly's data acquisition strategy. + """ + + initializers: Optional[List[Builder[Initializer]]] + """The initializers for the data system.""" + + primary_synchronizer: Builder[Synchronizer] + """The primary synchronizer for the data system.""" + + secondary_synchronizer: Optional[Builder[Synchronizer]] = None + """The secondary synchronizers for the data system.""" + + # TODO(fdv2): Implement this synchronizer up and hook it up everywhere. + # TODO(fdv2): Remove this when FDv2 is fully launched + fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None + """An optional fallback synchronizer that will read from FDv1""" + + class Config: """Advanced configuration options for the SDK client. @@ -194,6 +222,7 @@ def __init__( enable_event_compression: bool = False, omit_anonymous_contexts: bool = False, payload_filter_key: Optional[str] = None, + datasystem_config: Optional[DataSystemConfig] = None, ): """ :param sdk_key: The SDK key for your LaunchDarkly account. This is always required. @@ -264,6 +293,7 @@ def __init__( :param enable_event_compression: Whether or not to enable GZIP compression for outgoing events. :param omit_anonymous_contexts: Sets whether anonymous contexts should be omitted from index and identify events. :param payload_filter_key: The payload filter is used to selectively limited the flags and segments delivered in the data source payload. + :param datasystem_config: Configuration for the upcoming enhanced data system design. This is experimental and should not be set without direction from LaunchDarkly support. """ self.__sdk_key = validate_sdk_key_format(sdk_key, log) @@ -303,6 +333,7 @@ def __init__( self.__payload_filter_key = payload_filter_key self._data_source_update_sink: Optional[DataSourceUpdateSink] = None self._instance_id: Optional[str] = None + self._datasystem_config = datasystem_config def copy_with_new_sdk_key(self, new_sdk_key: str) -> 'Config': """Returns a new ``Config`` instance that is the same as this one, except for having a different SDK key. @@ -546,6 +577,15 @@ def data_source_update_sink(self) -> Optional[DataSourceUpdateSink]: """ return self._data_source_update_sink + @property + def datasystem_config(self) -> Optional[DataSystemConfig]: + """ + Configuration for the upcoming enhanced data system design. This is + experimental and should not be set without direction from LaunchDarkly + support. + """ + return self._datasystem_config + def _validate(self): if self.offline is False and self.sdk_key == '': log.warning("Missing or blank SDK key") diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index 224f49c5..c77ff8b4 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -90,6 +90,7 @@ def __init__( "ldclient.datasource.polling", poll_interval, 0, self._poll ) + @property def name(self) -> str: """Returns the name of the initializer.""" return "PollingDataSourceV2" diff --git a/ldclient/impl/datasourcev2/status.py b/ldclient/impl/datasourcev2/status.py new file mode 100644 index 00000000..ca384415 --- /dev/null +++ b/ldclient/impl/datasourcev2/status.py @@ -0,0 +1,57 @@ +import time +from typing import Callable, Optional + +from ldclient.impl.listeners import Listeners +from ldclient.impl.rwlock import ReadWriteLock +from ldclient.interfaces import ( + DataSourceErrorInfo, + DataSourceState, + DataSourceStatus, + DataSourceStatusProvider +) + + +class DataSourceStatusProviderImpl(DataSourceStatusProvider): + def __init__(self, listeners: Listeners): + self.__listeners = listeners + self.__status = DataSourceStatus(DataSourceState.INITIALIZING, 0, None) + self.__lock = ReadWriteLock() + + @property + def status(self) -> DataSourceStatus: + self.__lock.rlock() + status = self.__status + self.__lock.runlock() + + return status + + def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): + status_to_broadcast = None + + try: + self.__lock.lock() + old_status = self.__status + + if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING: + new_state = DataSourceState.INITIALIZING + + if new_state == old_status.state and new_error is None: + return + + new_since = self.__status.since if new_state == self.__status.state else time.time() + new_error = self.__status.error if new_error is None else new_error + + self.__status = DataSourceStatus(new_state, new_since, new_error) + + status_to_broadcast = self.__status + finally: + self.__lock.unlock() + + if status_to_broadcast is not None: + self.__listeners.notify(status_to_broadcast) + + def add_listener(self, listener: Callable[[DataSourceStatus], None]): + self.__listeners.add(listener) + + def remove_listener(self, listener: Callable[[DataSourceStatus], None]): + self.__listeners.remove(listener) diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py index 03ea68ff..808b5238 100644 --- a/ldclient/impl/datasourcev2/streaming.py +++ b/ldclient/impl/datasourcev2/streaming.py @@ -129,6 +129,13 @@ def __init__( self._config = config self._sse: Optional[SSEClient] = None + @property + def name(self) -> str: + """ + Returns the name of the synchronizer, which is used for logging and debugging. + """ + return "streaming" + def sync(self) -> Generator[Update, None, None]: """ sync should begin the synchronization process for the data source, yielding diff --git a/ldclient/impl/datastore/status.py b/ldclient/impl/datastore/status.py index a8dd5ee3..ee9797dd 100644 --- a/ldclient/impl/datastore/status.py +++ b/ldclient/impl/datastore/status.py @@ -1,7 +1,7 @@ from __future__ import annotations from copy import copy -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING, Callable, Protocol from ldclient.impl.listeners import Listeners from ldclient.impl.rwlock import ReadWriteLock diff --git a/ldclient/impl/datasystem/__init__.py b/ldclient/impl/datasystem/__init__.py index 9c5bf6d6..15b9e8f0 100644 --- a/ldclient/impl/datasystem/__init__.py +++ b/ldclient/impl/datasystem/__init__.py @@ -156,6 +156,14 @@ class Initializer(Protocol): # pylint: disable=too-few-public-methods as new changes occur. """ + @property + @abstractmethod + def name(self) -> str: + """ + Returns the name of the initializer, which is used for logging and debugging. + """ + raise NotImplementedError + @abstractmethod def fetch(self) -> BasisResult: """ @@ -188,6 +196,13 @@ class Synchronizer(Protocol): # pylint: disable=too-few-public-methods of the data source, including any changes that have occurred since the last synchronization. """ + @property + @abstractmethod + def name(self) -> str: + """ + Returns the name of the synchronizer, which is used for logging and debugging. + """ + raise NotImplementedError @abstractmethod def sync(self) -> Generator[Update, None, None]: diff --git a/ldclient/impl/datasystem/config.py b/ldclient/impl/datasystem/config.py index c0e66d6b..e9c42efd 100644 --- a/ldclient/impl/datasystem/config.py +++ b/ldclient/impl/datasystem/config.py @@ -2,10 +2,10 @@ Configuration for LaunchDarkly's data acquisition strategy. """ -from dataclasses import dataclass from typing import Callable, List, Optional, TypeVar from ldclient.config import Config as LDConfig +from ldclient.config import DataSystemConfig from ldclient.impl.datasourcev2.polling import ( PollingDataSource, PollingDataSourceBuilder, @@ -22,22 +22,6 @@ Builder = Callable[[], T] -@dataclass(frozen=True) -class Config: - """ - Configuration for LaunchDarkly's data acquisition strategy. - """ - - initializers: Optional[List[Builder[Initializer]]] - """The initializers for the data system.""" - - primary_synchronizer: Builder[Synchronizer] - """The primary synchronizer for the data system.""" - - secondary_synchronizer: Optional[Builder[Synchronizer]] - """The secondary synchronizers for the data system.""" - - class ConfigBuilder: # pylint: disable=too-few-public-methods """ Builder for the data system configuration. @@ -47,7 +31,7 @@ class ConfigBuilder: # pylint: disable=too-few-public-methods _primary_synchronizer: Optional[Builder[Synchronizer]] = None _secondary_synchronizer: Optional[Builder[Synchronizer]] = None - def initializers(self, initializers: List[Builder[Initializer]]) -> "ConfigBuilder": + def initializers(self, initializers: Optional[List[Builder[Initializer]]]) -> "ConfigBuilder": """ Sets the initializers for the data system. """ @@ -66,14 +50,14 @@ def synchronizers( self._secondary_synchronizer = secondary return self - def build(self) -> Config: + def build(self) -> DataSystemConfig: """ Builds the data system configuration. """ if self._primary_synchronizer is None: raise ValueError("Primary synchronizer must be set") - return Config( + return DataSystemConfig( initializers=self._initializers, primary_synchronizer=self._primary_synchronizer, secondary_synchronizer=self._secondary_synchronizer, @@ -144,7 +128,7 @@ def polling(config: LDConfig) -> ConfigBuilder: streaming, but may be necessary in some network environments. """ - polling_builder = __polling_ds_builder(config) + polling_builder: Builder[Synchronizer] = __polling_ds_builder(config) builder = ConfigBuilder() builder.synchronizers(polling_builder) diff --git a/ldclient/impl/datasystem/fdv1.py b/ldclient/impl/datasystem/fdv1.py index d291aba3..e45498e2 100644 --- a/ldclient/impl/datasystem/fdv1.py +++ b/ldclient/impl/datasystem/fdv1.py @@ -142,7 +142,16 @@ def flag_tracker(self) -> FlagTracker: @property def data_availability(self) -> DataAvailability: - return self._data_availability + if self._config.offline: + return DataAvailability.DEFAULTS + + if self._update_processor is not None and self._update_processor.initialized(): + return DataAvailability.REFRESHED + + if self._store_wrapper.initialized: + return DataAvailability.CACHED + + return DataAvailability.DEFAULTS @property def target_availability(self) -> DataAvailability: diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py new file mode 100644 index 00000000..cfb61750 --- /dev/null +++ b/ldclient/impl/datasystem/fdv2.py @@ -0,0 +1,415 @@ +import time +from threading import Event, Thread +from typing import Callable, List, Optional + +from ldclient.config import Builder, DataSystemConfig +from ldclient.impl.datasourcev2.status import DataSourceStatusProviderImpl +from ldclient.impl.datasystem import DataAvailability, Synchronizer +from ldclient.impl.datasystem.store import Store +from ldclient.impl.flag_tracker import FlagTrackerImpl +from ldclient.impl.listeners import Listeners +from ldclient.impl.util import _Fail +from ldclient.interfaces import ( + DataSourceState, + DataSourceStatus, + DataSourceStatusProvider, + DataStoreStatusProvider, + FeatureStore, + FlagTracker +) + + +class FDv2: + """ + FDv2 is an implementation of the DataSystem interface that uses the Flag Delivery V2 protocol + for obtaining and keeping data up-to-date. Additionally, it operates with an optional persistent + store in read-only or read/write mode. + """ + + def __init__( + self, + config: DataSystemConfig, + # # TODO: These next 2 parameters should be moved into the Config. + # persistent_store: Optional[FeatureStore] = None, + # store_writable: bool = True, + disabled: bool = False, + ): + """ + Initialize a new FDv2 data system. + + :param config: Configuration for initializers and synchronizers + :param persistent_store: Optional persistent store for data persistence + :param store_writable: Whether the persistent store should be written to + :param disabled: Whether the data system is disabled (offline mode) + """ + self._config = config + self._primary_synchronizer_builder: Optional[Builder[Synchronizer]] = config.primary_synchronizer + self._secondary_synchronizer_builder = config.secondary_synchronizer + self._fdv1_fallback_synchronizer_builder = config.fdv1_fallback_synchronizer + self._disabled = disabled + + # Diagnostic accumulator provided by client for streaming metrics + # TODO(fdv2): Either we need to use this, or we need to provide it to + # the streaming synchronizers + self._diagnostic_accumulator = None + + # Set up event listeners + self._flag_change_listeners = Listeners() + self._change_set_listeners = Listeners() + + # Create the store + self._store = Store(self._flag_change_listeners, self._change_set_listeners) + + # Status providers + self._data_source_status_provider = DataSourceStatusProviderImpl(Listeners()) + + # # Configure persistent store if provided + # if persistent_store is not None: + # self._store.with_persistence( + # persistent_store, store_writable, self._data_source_status_provider + # ) + # + # Flag tracker (evaluation function set later by client) + self._flag_tracker = FlagTrackerImpl( + self._flag_change_listeners, + lambda key, context: None # Placeholder, replaced by client + ) + + # Threading + self._stop_event = Event() + self._threads: List[Thread] = [] + + # Track configuration + # TODO: What is the point of checking if primary_synchronizer is not + # None? Doesn't it have to be set? + self._configured_with_data_sources = ( + (config.initializers is not None and len(config.initializers) > 0) + or config.primary_synchronizer is not None + ) + + def start(self, set_on_ready: Event): + """ + Start the FDv2 data system. + + :param set_on_ready: Event to set when the system is ready or has failed + """ + if self._disabled: + print("Data system is disabled, SDK will return application-defined default values") + set_on_ready.set() + return + + self._stop_event.clear() + + # Start the main coordination thread + main_thread = Thread( + target=self._run_main_loop, + args=(set_on_ready,), + name="FDv2-main", + daemon=True + ) + main_thread.start() + self._threads.append(main_thread) + + def stop(self): + """Stop the FDv2 data system and all associated threads.""" + self._stop_event.set() + + # Wait for all threads to complete + for thread in self._threads: + if thread.is_alive(): + thread.join(timeout=5.0) # 5 second timeout + + # Close the store + self._store.close() + + def set_diagnostic_accumulator(self, diagnostic_accumulator): + """ + Sets the diagnostic accumulator for streaming initialization metrics. + This should be called before start() to ensure metrics are collected. + """ + self._diagnostic_accumulator = diagnostic_accumulator + + def _run_main_loop(self, set_on_ready: Event): + """Main coordination loop that manages initializers and synchronizers.""" + try: + self._data_source_status_provider.update_status( + DataSourceState.INITIALIZING, None + ) + + # Run initializers first + self._run_initializers(set_on_ready) + + # # If we have persistent store with status monitoring, start recovery monitoring + # if ( + # self._configured_with_data_sources + # and self._data_store_status_provider is not None + # and hasattr(self._data_store_status_provider, 'add_listener') + # ): + # recovery_thread = Thread( + # target=self._run_persistent_store_outage_recovery, + # name="FDv2-store-recovery", + # daemon=True + # ) + # recovery_thread.start() + # self._threads.append(recovery_thread) + + # Run synchronizers + self._run_synchronizers(set_on_ready) + + except Exception as e: + print(f"Error in FDv2 main loop: {e}") + # Ensure ready event is set even on error + if not set_on_ready.is_set(): + set_on_ready.set() + + def _run_initializers(self, set_on_ready: Event): + """Run initializers to get initial data.""" + if self._config.initializers is None: + return + + for initializer_builder in self._config.initializers: + if self._stop_event.is_set(): + return + + try: + initializer = initializer_builder() + print(f"Attempting to initialize via {initializer.name}") + + basis_result = initializer.fetch() + + if isinstance(basis_result, _Fail): + print(f"Initializer {initializer.name} failed: {basis_result.error}") + continue + + basis = basis_result.value + print(f"Initialized via {initializer.name}") + + # Apply the basis to the store + self._store.apply(basis.change_set, basis.persist) + + # Set ready event + if not set_on_ready.is_set(): + set_on_ready.set() + except Exception as e: + print(f"Initializer failed with exception: {e}") + + def _run_synchronizers(self, set_on_ready: Event): + """Run synchronizers to keep data up-to-date.""" + # If no primary synchronizer configured, just set ready and return + if self._config.primary_synchronizer is None: + if not set_on_ready.is_set(): + set_on_ready.set() + return + + def synchronizer_loop(self: 'FDv2'): + try: + # Always ensure ready event is set when we exit + while not self._stop_event.is_set() and self._primary_synchronizer_builder is not None: + # Try primary synchronizer + try: + primary_sync = self._primary_synchronizer_builder() + print(f"Primary synchronizer {primary_sync.name} is starting") + + remove_sync, fallback_v1 = self._consume_synchronizer_results( + primary_sync, set_on_ready, self._fallback_condition + ) + + if remove_sync: + self._primary_synchronizer_builder = self._secondary_synchronizer_builder + self._secondary_synchronizer_builder = None + + if fallback_v1: + self._primary_synchronizer_builder = self._fdv1_fallback_synchronizer_builder + + if self._primary_synchronizer_builder is None: + print("No more synchronizers available") + self._data_source_status_provider.update_status( + DataSourceState.OFF, + self._data_source_status_provider.status.error + ) + break + else: + print("Fallback condition met") + + if self._secondary_synchronizer_builder is None: + continue + + secondary_sync = self._secondary_synchronizer_builder() + print(f"Secondary synchronizer {secondary_sync.name} is starting") + + remove_sync, fallback_v1 = self._consume_synchronizer_results( + secondary_sync, set_on_ready, self._recovery_condition + ) + + if remove_sync: + self._secondary_synchronizer_builder = None + if fallback_v1: + self._primary_synchronizer_builder = self._fdv1_fallback_synchronizer_builder + + if self._primary_synchronizer_builder is None: + print("No more synchronizers available") + self._data_source_status_provider.update_status( + DataSourceState.OFF, + self._data_source_status_provider.status.error + ) + # TODO: WE might need to also set that threading.Event here + break + + print("Recovery condition met, returning to primary synchronizer") + except Exception as e: + print(f"Failed to build primary synchronizer: {e}") + break + + except Exception as e: + print(f"Error in synchronizer loop: {e}") + finally: + # Ensure we always set the ready event when exiting + if not set_on_ready.is_set(): + set_on_ready.set() + + sync_thread = Thread( + target=synchronizer_loop, + name="FDv2-synchronizers", + args=(self,), + daemon=True + ) + sync_thread.start() + self._threads.append(sync_thread) + + def _consume_synchronizer_results( + self, + synchronizer: Synchronizer, + set_on_ready: Event, + condition_func: Callable[[DataSourceStatus], bool] + ) -> tuple[bool, bool]: + """ + Consume results from a synchronizer until a condition is met or it fails. + + :return: Tuple of (should_remove_sync, fallback_to_fdv1) + """ + try: + for update in synchronizer.sync(): + print(f"Synchronizer {synchronizer.name} update: {update.state}") + if self._stop_event.is_set(): + return False, False + + # Handle the update + if update.change_set is not None: + self._store.apply(update.change_set, True) + + # Set ready event on first valid update + if update.state == DataSourceState.VALID and not set_on_ready.is_set(): + set_on_ready.set() + + # Update status + self._data_source_status_provider.update_status(update.state, update.error) + + # Check for OFF state indicating permanent failure + if update.state == DataSourceState.OFF: + return True, update.revert_to_fdv1 + + # Check condition periodically + current_status = self._data_source_status_provider.status + if condition_func(current_status): + return False, False + + except Exception as e: + print(f"Error consuming synchronizer results: {e}") + return True, False + + return True, False + + # def _run_persistent_store_outage_recovery(self): + # """Monitor persistent store status and trigger recovery when needed.""" + # # This is a simplified version - in a full implementation we'd need + # # to properly monitor store status and trigger commit operations + # # when the store comes back online after an outage + # pass + # + def _fallback_condition(self, status: DataSourceStatus) -> bool: + """ + Determine if we should fallback to secondary synchronizer. + + :param status: Current data source status + :return: True if fallback condition is met + """ + interrupted_at_runtime = ( + status.state == DataSourceState.INTERRUPTED + and time.time() - status.since > 60 # 1 minute + ) + cannot_initialize = ( + status.state == DataSourceState.INITIALIZING + and time.time() - status.since > 10 # 10 seconds + ) + + return interrupted_at_runtime or cannot_initialize + + def _recovery_condition(self, status: DataSourceStatus) -> bool: + """ + Determine if we should try to recover to primary synchronizer. + + :param status: Current data source status + :return: True if recovery condition is met + """ + interrupted_at_runtime = ( + status.state == DataSourceState.INTERRUPTED + and time.time() - status.since > 60 # 1 minute + ) + healthy_for_too_long = ( + status.state == DataSourceState.VALID + and time.time() - status.since > 300 # 5 minutes + ) + cannot_initialize = ( + status.state == DataSourceState.INITIALIZING + and time.time() - status.since > 10 # 10 seconds + ) + + return interrupted_at_runtime or healthy_for_too_long or cannot_initialize + + @property + def store(self) -> FeatureStore: + """Get the underlying store for flag evaluation.""" + return self._store.get_active_store() + + def set_flag_value_eval_fn(self, eval_fn): + """ + Set the flag value evaluation function for the flag tracker. + + :param eval_fn: Function with signature (key: str, context: Context) -> Any + """ + self._flag_tracker = FlagTrackerImpl(self._flag_change_listeners, eval_fn) + + @property + def data_source_status_provider(self) -> DataSourceStatusProvider: + """Get the data source status provider.""" + return self._data_source_status_provider + + @property + def data_store_status_provider(self) -> DataStoreStatusProvider: + """Get the data store status provider.""" + raise NotImplementedError + # return self._data_store_status_provider + + @property + def flag_tracker(self) -> FlagTracker: + """Get the flag tracker for monitoring flag changes.""" + return self._flag_tracker + + @property + def data_availability(self) -> DataAvailability: + """Get the current data availability level.""" + if self._store.selector().is_defined(): + return DataAvailability.REFRESHED + + if not self._configured_with_data_sources or self._store.is_initialized(): + return DataAvailability.CACHED + + return DataAvailability.DEFAULTS + + @property + def target_availability(self) -> DataAvailability: + """Get the target data availability level based on configuration.""" + if self._configured_with_data_sources: + return DataAvailability.REFRESHED + + return DataAvailability.CACHED diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py new file mode 100644 index 00000000..435a0faf --- /dev/null +++ b/ldclient/impl/datasystem/store.py @@ -0,0 +1,355 @@ +""" +Store implementation for FDv2 data system. + +This module provides a dual-mode persistent/in-memory store that serves requests for data +from the evaluation algorithm. It manages both in-memory and persistent storage, handling +ChangeSet applications and flag change notifications. +""" + +import threading +from typing import Dict, List, Mapping, Optional, Set + +from ldclient.feature_store import InMemoryFeatureStore +from ldclient.impl.datasystem.protocolv2 import ( + Change, + ChangeSet, + ChangeType, + IntentCode, + ObjectKind, + Selector +) +from ldclient.impl.dependency_tracker import DependencyTracker, KindAndKey +from ldclient.impl.listeners import Listeners +from ldclient.interfaces import ( + DataStoreStatusProvider, + FeatureStore, + FlagChange +) +from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind + + +class Store: + """ + Store is a dual-mode persistent/in-memory store that serves requests for data from the evaluation + algorithm. + + At any given moment one of two stores is active: in-memory, or persistent. Once the in-memory + store has data (either from initializers or a synchronizer), the persistent store is no longer + read from. From that point forward, calls to get data will serve from the memory store. + """ + + def __init__( + self, + flag_change_listeners: Listeners, + change_set_listeners: Listeners, + ): + """ + Initialize a new Store. + + Args: + flag_change_listeners: Listeners for flag change events + change_set_listeners: Listeners for changeset events + """ + self._persistent_store: Optional[FeatureStore] = None + self._persistent_store_status_provider: Optional[DataStoreStatusProvider] = None + self._persistent_store_writable = False + + # Source of truth for flag evaluations once initialized + self._memory_store = InMemoryFeatureStore() + + # Used to track dependencies between items in the store + self._dependency_tracker = DependencyTracker() + + # Listeners for events + self._flag_change_listeners = flag_change_listeners + self._change_set_listeners = change_set_listeners + + # True if the data in the memory store may be persisted to the persistent store + self._persist = False + + # Points to the active store. Swapped upon initialization. + self._active_store: FeatureStore = self._memory_store + + # Identifies the current data + self._selector = Selector.no_selector() + + # Thread synchronization + self._lock = threading.RLock() + + def with_persistence( + self, + persistent_store: FeatureStore, + writable: bool, + status_provider: Optional[DataStoreStatusProvider] = None, + ) -> "Store": + """ + Configure the store with a persistent store for read-only or read-write access. + + Args: + persistent_store: The persistent store implementation + writable: Whether the persistent store should be written to + status_provider: Optional status provider for the persistent store + + Returns: + Self for method chaining + """ + with self._lock: + self._persistent_store = persistent_store + self._persistent_store_writable = writable + self._persistent_store_status_provider = status_provider + + # Initially use persistent store as active until memory store has data + self._active_store = persistent_store + + return self + + def selector(self) -> Selector: + """Returns the current selector.""" + with self._lock: + return self._selector + + def close(self) -> Optional[Exception]: + """Close the store and any persistent store if configured.""" + with self._lock: + if self._persistent_store is not None: + try: + # Most FeatureStore implementations don't have close methods + # but we'll try to call it if it exists + if hasattr(self._persistent_store, 'close'): + self._persistent_store.close() + except Exception as e: + return e + return None + + def apply(self, change_set: ChangeSet, persist: bool) -> None: + """ + Apply a changeset to the store. + + Args: + change_set: The changeset to apply + persist: Whether the changes should be persisted to the persistent store + """ + with self._lock: + try: + if change_set.intent_code == IntentCode.TRANSFER_FULL: + self._set_basis(change_set, persist) + elif change_set.intent_code == IntentCode.TRANSFER_CHANGES: + self._apply_delta(change_set, persist) + elif change_set.intent_code == IntentCode.TRANSFER_NONE: + # No-op, no changes to apply + return + + # Notify changeset listeners + self._change_set_listeners.notify(change_set) + + except Exception as e: + # Log error but don't re-raise - matches Go behavior + print(f"Store: couldn't apply changeset: {e}") + + def _set_basis(self, change_set: ChangeSet, persist: bool) -> None: + """ + Set the basis of the store. Any existing data is discarded. + + Args: + change_set: The changeset containing the new basis data + persist: Whether to persist the data to the persistent store + """ + # Take snapshot for change detection if we have flag listeners + old_data: Optional[Mapping[VersionedDataKind, Mapping[str, dict]]] = None + if self._flag_change_listeners.has_listeners(): + old_data = {} + for kind in [FEATURES, SEGMENTS]: + old_data[kind] = self._memory_store.all(kind, lambda x: x) + + # Convert changes to the format expected by FeatureStore.init() + all_data = self._changes_to_store_data(change_set.changes) + + # Initialize memory store with new data + self._memory_store.init(all_data) + + # Update dependency tracker + self._reset_dependency_tracker(all_data) + + # Send change events if we had listeners + if old_data is not None: + affected_items = self._compute_changed_items_for_full_data_set(old_data, all_data) + self._send_change_events(affected_items) + + # Update state + self._persist = persist + if change_set.selector is not None: + self._selector = change_set.selector + + # Switch to memory store as active + self._active_store = self._memory_store + + # Persist to persistent store if configured and writable + if self._should_persist(): + self._persistent_store.init(all_data) # type: ignore + + def _apply_delta(self, change_set: ChangeSet, persist: bool) -> None: + """ + Apply a delta update to the store. + + Args: + change_set: The changeset containing the delta changes + persist: Whether to persist the changes to the persistent store + """ + has_listeners = self._flag_change_listeners.has_listeners() + affected_items: Set[KindAndKey] = set() + + # Apply each change + for change in change_set.changes: + if change.action == ChangeType.PUT: + # Convert to VersionedDataKind + kind = FEATURES if change.kind == ObjectKind.FLAG else SEGMENTS + item = change.object + if item is not None: + self._memory_store.upsert(kind, item) + + # Update dependency tracking + self._dependency_tracker.update_dependencies_from(kind, change.key, item) + if has_listeners: + self._dependency_tracker.add_affected_items( + affected_items, KindAndKey(kind=kind, key=change.key) + ) + + # Persist to persistent store if configured + if self._should_persist(): + self._persistent_store.upsert(kind, item) # type: ignore + + elif change.action == ChangeType.DELETE: + # Convert to VersionedDataKind + kind = FEATURES if change.kind == ObjectKind.FLAG else SEGMENTS + self._memory_store.delete(kind, change.key, change.version) + + # Update dependency tracking + self._dependency_tracker.update_dependencies_from(kind, change.key, None) + if has_listeners: + self._dependency_tracker.add_affected_items( + affected_items, KindAndKey(kind=kind, key=change.key) + ) + + # Persist to persistent store if configured + if self._should_persist(): + self._persistent_store.delete(kind, change.key, change.version) # type: ignore + + # Send change events + if affected_items: + self._send_change_events(affected_items) + + # Update state + self._persist = persist + if change_set.selector is not None: + self._selector = change_set.selector + + def _should_persist(self) -> bool: + """Returns whether data should be persisted to the persistent store.""" + return ( + self._persist + and self._persistent_store is not None + and self._persistent_store_writable + ) + + def _changes_to_store_data( + self, changes: List[Change] + ) -> Mapping[VersionedDataKind, Mapping[str, dict]]: + """ + Convert a list of Changes to the format expected by FeatureStore.init(). + + Args: + changes: List of changes to convert + + Returns: + Mapping suitable for FeatureStore.init() + """ + all_data: Dict[VersionedDataKind, Dict[str, dict]] = { + FEATURES: {}, + SEGMENTS: {}, + } + + for change in changes: + if change.action == ChangeType.PUT and change.object is not None: + kind = FEATURES if change.kind == ObjectKind.FLAG else SEGMENTS + all_data[kind][change.key] = change.object + + return all_data + + def _reset_dependency_tracker( + self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]] + ) -> None: + """Reset dependency tracker with new full data set.""" + self._dependency_tracker.reset() + for kind, items in all_data.items(): + for key, item in items.items(): + self._dependency_tracker.update_dependencies_from(kind, key, item) + + def _send_change_events(self, affected_items: Set[KindAndKey]) -> None: + """Send flag change events for affected items.""" + for item in affected_items: + if item.kind == FEATURES: + self._flag_change_listeners.notify(FlagChange(item.key)) + + def _compute_changed_items_for_full_data_set( + self, + old_data: Mapping[VersionedDataKind, Mapping[str, dict]], + new_data: Mapping[VersionedDataKind, Mapping[str, dict]], + ) -> Set[KindAndKey]: + """Compute which items changed between old and new data sets.""" + affected_items: Set[KindAndKey] = set() + + for kind in [FEATURES, SEGMENTS]: + old_items = old_data.get(kind, {}) + new_items = new_data.get(kind, {}) + + # Get all keys from both old and new data + all_keys = set(old_items.keys()) | set(new_items.keys()) + + for key in all_keys: + old_item = old_items.get(key) + new_item = new_items.get(key) + + # If either is missing or versions differ, it's a change + if old_item is None or new_item is None: + self._dependency_tracker.add_affected_items( + affected_items, KindAndKey(kind=kind, key=key) + ) + elif old_item.get("version", 0) != new_item.get("version", 0): + self._dependency_tracker.add_affected_items( + affected_items, KindAndKey(kind=kind, key=key) + ) + + return affected_items + + def commit(self) -> Optional[Exception]: + """ + Commit persists the data in the memory store to the persistent store, if configured. + + Returns: + Exception if commit failed, None otherwise + """ + with self._lock: + if self._should_persist(): + try: + # Get all data from memory store and write to persistent store + all_data = {} + for kind in [FEATURES, SEGMENTS]: + all_data[kind] = self._memory_store.all(kind, lambda x: x) + self._persistent_store.init(all_data) # type: ignore + except Exception as e: + return e + return None + + def get_active_store(self) -> FeatureStore: + """Get the currently active store for reading data.""" + with self._lock: + return self._active_store + + def is_initialized(self) -> bool: + """Check if the active store is initialized.""" + return self.get_active_store().initialized + + def get_data_store_status_provider(self) -> Optional[DataStoreStatusProvider]: + """Get the data store status provider for the persistent store, if configured.""" + with self._lock: + return self._persistent_store_status_provider diff --git a/ldclient/impl/integrations/test_datav2/test_data_sourcev2.py b/ldclient/impl/integrations/test_datav2/test_data_sourcev2.py index 12f68c92..bf3397c3 100644 --- a/ldclient/impl/integrations/test_datav2/test_data_sourcev2.py +++ b/ldclient/impl/integrations/test_datav2/test_data_sourcev2.py @@ -42,6 +42,11 @@ def __init__(self, test_data): # - Added to `upsert_flag` to address potential race conditions. # - The `sync` method relies on Queue's thread-safe properties for updates. + @property + def name(self) -> str: + """Return the name of this data source.""" + return "TestDataV2" + def fetch(self) -> BasisResult: """ Implementation of the Initializer.fetch method. diff --git a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py index be2e538f..0a7079d6 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py @@ -30,7 +30,7 @@ def test_polling_has_a_name(): mock_requester = MockPollingRequester(_Fail(error="failure message")) ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) - assert ds.name() == "PollingDataSourceV2" + assert ds.name == "PollingDataSourceV2" def test_error_is_returned_on_failure(): diff --git a/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py index ff8bf2eb..92391368 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py @@ -202,86 +202,6 @@ def test_handles_delete_objects(): assert valid.change_set.intent_code == IntentCode.TRANSFER_FULL -# def test_swallows_goodbye(events): # pylint: disable=redefined-outer-name -# builder = list_sse_client( -# [ -# events[EventName.SERVER_INTENT], -# events[EventName.GOODBYE], -# events[EventName.PAYLOAD_TRANSFERRED], -# ] -# ) -# -# synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) -# updates = list(synchronizer.sync()) -# -# builder = ChangeSetBuilder() -# builder.start(intent=IntentCode.TRANSFER_FULL) -# change_set = builder.finish(selector=Selector(state="p:SOMETHING:300", version=300)) -# headers = {} -# polling_result: PollingResult = _Success(value=(change_set, headers)) -# -# synchronizer = PollingDataSource( -# poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result])) -# ) -# updates = list(synchronizer.sync()) -# -# assert len(updates) == 1 -# assert updates[0].state == DataSourceState.VALID -# assert updates[0].error is None -# assert updates[0].revert_to_fdv1 is False -# assert updates[0].environment_id is None -# -# assert updates[0].change_set is not None -# assert len(updates[0].change_set.changes) == 1 -# assert updates[0].change_set.changes[0].action == ChangeType.DELETE -# assert updates[0].change_set.changes[0].kind == ObjectKind.FLAG -# assert updates[0].change_set.changes[0].key == "flag-key" -# assert updates[0].change_set.changes[0].version == 101 -# assert updates[0].change_set.selector is not None -# assert updates[0].change_set.selector.version == 300 -# assert updates[0].change_set.selector.state == "p:SOMETHING:300" -# assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL -# -# assert len(updates) == 1 -# assert updates[0].state == DataSourceState.VALID -# assert updates[0].error is None -# assert updates[0].revert_to_fdv1 is False -# assert updates[0].environment_id is None -# -# assert updates[0].change_set is not None -# assert len(updates[0].change_set.changes) == 0 -# assert updates[0].change_set.selector is not None -# assert updates[0].change_set.selector.version == 300 -# assert updates[0].change_set.selector.state == "p:SOMETHING:300" -# assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL -# -# -# def test_swallows_heartbeat(events): # pylint: disable=redefined-outer-name -# builder = list_sse_client( -# [ -# events[EventName.SERVER_INTENT], -# events[EventName.HEARTBEAT], -# events[EventName.PAYLOAD_TRANSFERRED], -# ] -# ) -# -# synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) -# updates = list(synchronizer.sync()) -# -# assert len(updates) == 1 -# assert updates[0].state == DataSourceState.VALID -# assert updates[0].error is None -# assert updates[0].revert_to_fdv1 is False -# assert updates[0].environment_id is None -# -# assert updates[0].change_set is not None -# assert len(updates[0].change_set.changes) == 0 -# assert updates[0].change_set.selector is not None -# assert updates[0].change_set.selector.version == 300 -# assert updates[0].change_set.selector.state == "p:SOMETHING:300" -# assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL -# -# def test_generic_error_interrupts_and_recovers(): builder = ChangeSetBuilder() builder.start(intent=IntentCode.TRANSFER_FULL) diff --git a/ldclient/testing/impl/datasystem/test_config.py b/ldclient/testing/impl/datasystem/test_config.py index c7c0925b..db73aece 100644 --- a/ldclient/testing/impl/datasystem/test_config.py +++ b/ldclient/testing/impl/datasystem/test_config.py @@ -1,12 +1,11 @@ import dataclasses -from unittest.mock import MagicMock, Mock +from unittest.mock import Mock import pytest from ldclient.config import Config as LDConfig -from ldclient.impl.datasystem import Initializer, Synchronizer +from ldclient.config import DataSystemConfig from ldclient.impl.datasystem.config import ( - Config, ConfigBuilder, custom, default, @@ -63,7 +62,7 @@ def test_config_builder_build_success(): config = builder.build() - assert isinstance(config, Config) + assert isinstance(config, DataSystemConfig) assert config.initializers == [mock_initializer] assert config.primary_synchronizer == mock_primary assert config.secondary_synchronizer == mock_secondary @@ -178,11 +177,11 @@ def test_polling_config_builder(): def test_config_dataclass_immutability(): - """Test that Config instances are immutable (frozen dataclass).""" + """Test that DataSystemConfig instances are immutable (frozen dataclass).""" mock_primary = Mock() mock_secondary = Mock() - config = Config( + config = DataSystemConfig( initializers=None, primary_synchronizer=mock_primary, secondary_synchronizer=mock_secondary, diff --git a/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py b/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py new file mode 100644 index 00000000..b0db1426 --- /dev/null +++ b/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py @@ -0,0 +1,159 @@ +# pylint: disable=missing-docstring + +from threading import Event +from typing import List + +from mock import Mock + +from ldclient.config import DataSystemConfig +from ldclient.impl.datasystem import DataAvailability, Synchronizer +from ldclient.impl.datasystem.fdv2 import FDv2 +from ldclient.integrations.test_datav2 import TestDataV2 +from ldclient.interfaces import DataSourceState, DataSourceStatus, FlagChange + + +def test_two_phase_init(): + td_initializer = TestDataV2.data_source() + td_initializer.update(td_initializer.flag("feature-flag").on(True)) + + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + config = DataSystemConfig( + initializers=[td_initializer.build_initializer], + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + + changed = Event() + changes: List[FlagChange] = [] + count = 0 + + def listener(flag_change: FlagChange): + nonlocal count, changes + count += 1 + changes.append(flag_change) + + if count == 2: + changed.set() + + fdv2.flag_tracker.add_listener(listener) + + fdv2.start(set_on_ready) + assert set_on_ready.wait(1), "Data system did not become ready in time" + + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(False)) + assert changed.wait(1), "Flag change listener was not called in time" + assert len(changes) == 2 + assert changes[0].key == "feature-flag" + assert changes[1].key == "feature-flag" + + +def test_can_stop_fdv2(): + td = TestDataV2.data_source() + config = DataSystemConfig( + initializers=None, + primary_synchronizer=td.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + + changed = Event() + changes: List[FlagChange] = [] + + def listener(flag_change: FlagChange): + changes.append(flag_change) + changed.set() + + fdv2.flag_tracker.add_listener(listener) + + fdv2.start(set_on_ready) + assert set_on_ready.wait(1), "Data system did not become ready in time" + + fdv2.stop() + + td.update(td.flag("feature-flag").on(False)) + assert changed.wait(1) is False, "Flag change listener was erroneously called" + assert len(changes) == 0 + + +def test_fdv2_data_availability_is_refreshed_with_data(): + td = TestDataV2.data_source() + config = DataSystemConfig( + initializers=None, + primary_synchronizer=td.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + + fdv2.start(set_on_ready) + assert set_on_ready.wait(1), "Data system did not become ready in time" + + assert fdv2.data_availability.at_least(DataAvailability.REFRESHED) + assert fdv2.target_availability.at_least(DataAvailability.REFRESHED) + + +def test_fdv2_fallsback_to_secondary_synchronizer(): + mock: Synchronizer = Mock() + mock.sync.return_value = iter([]) # Empty iterator to simulate no data + td = TestDataV2.data_source() + td.update(td.flag("feature-flag").on(True)) + config = DataSystemConfig( + initializers=[td.build_initializer], + primary_synchronizer=lambda: mock, # Primary synchronizer is None to force fallback + secondary_synchronizer=td.build_synchronizer, + ) + + changed = Event() + changes: List[FlagChange] = [] + count = 0 + + def listener(flag_change: FlagChange): + nonlocal count, changes + count += 1 + changes.append(flag_change) + + if count == 2: + changed.set() + + set_on_ready = Event() + fdv2 = FDv2(config) + fdv2.flag_tracker.add_listener(listener) + fdv2.start(set_on_ready) + assert set_on_ready.wait(1), "Data system did not become ready in time" + + td.update(td.flag("feature-flag").on(False)) + assert changed.wait(1), "Flag change listener was not called in time" + assert len(changes) == 2 + assert changes[0].key == "feature-flag" + assert changes[1].key == "feature-flag" + + +def test_fdv2_shutdown_down_if_both_synchronizers_fail(): + mock: Synchronizer = Mock() + mock.sync.return_value = iter([]) # Empty iterator to simulate no data + td = TestDataV2.data_source() + td.update(td.flag("feature-flag").on(True)) + config = DataSystemConfig( + initializers=[td.build_initializer], + primary_synchronizer=lambda: mock, # Primary synchronizer is None to force fallback + secondary_synchronizer=lambda: mock, # Secondary synchronizer also fails + ) + + changed = Event() + + def listener(status: DataSourceStatus): + if status.state == DataSourceState.OFF: + changed.set() + + set_on_ready = Event() + fdv2 = FDv2(config) + fdv2.data_source_status_provider.add_listener(listener) + fdv2.start(set_on_ready) + assert set_on_ready.wait(1), "Data system did not become ready in time" + + assert changed.wait(1), "Data system did not shut down in time" + assert fdv2.data_source_status_provider.status.state == DataSourceState.OFF