diff --git a/ldclient/client.py b/ldclient/client.py index 1a9b7993..091b064f 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -249,26 +249,32 @@ def __start_up(self, start_wait: float): self.__hooks_lock = ReadWriteLock() self.__hooks = self._config.hooks + plugin_hooks # type: List[Hook] - data_store_listeners = Listeners() - store_sink = DataStoreUpdateSinkImpl(data_store_listeners) - store = _FeatureStoreClientWrapper(self._config.feature_store, store_sink) - - self.__data_store_status_provider = DataStoreStatusProviderImpl(store, store_sink) - - data_source_listeners = Listeners() - flag_change_listeners = Listeners() - - self.__flag_tracker = FlagTrackerImpl(flag_change_listeners, lambda key, context: self.variation(key, context, None)) + # Initialize data system (FDv1) to encapsulate v1 data plumbing + from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency + FDv1 + ) - self._config._data_source_update_sink = DataSourceUpdateSinkImpl(store, data_source_listeners, flag_change_listeners) - self.__data_source_status_provider = DataSourceStatusProviderImpl(data_source_listeners, self._config._data_source_update_sink) - self._store = store # type: FeatureStore + self._data_system = FDv1(self._config) + # Provide flag evaluation function for value-change tracking + self._data_system.set_flag_value_eval_fn( + lambda key, context: self.variation(key, context, None) + ) + # Expose providers and store from data system + self.__data_store_status_provider = self._data_system.data_store_status_provider + self.__data_source_status_provider = ( + self._data_system.data_source_status_provider + ) + self.__flag_tracker = self._data_system.flag_tracker + self._store = self._data_system.store # type: FeatureStore big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments) self.__big_segment_store_manager = big_segment_store_manager self._evaluator = Evaluator( - lambda key: _get_store_item(store, FEATURES, key), lambda key: _get_store_item(store, SEGMENTS, key), lambda key: big_segment_store_manager.get_user_membership(key), log + lambda key: _get_store_item(self._store, FEATURES, key), + lambda key: _get_store_item(self._store, SEGMENTS, key), + lambda key: big_segment_store_manager.get_user_membership(key), + log, ) if self._config.offline: @@ -279,11 +285,13 @@ def __start_up(self, start_wait: float): diagnostic_accumulator = self._set_event_processor(self._config) + # Pass diagnostic accumulator to data system for streaming metrics + self._data_system.set_diagnostic_accumulator(diagnostic_accumulator) + self.__register_plugins(environment_metadata) update_processor_ready = threading.Event() - self._update_processor = self._make_update_processor(self._config, self._store, update_processor_ready, diagnostic_accumulator) - self._update_processor.start() + self._data_system.start(update_processor_ready) if not self._config.offline and not self._config.use_ldd: if start_wait > 60: @@ -293,7 +301,7 @@ def __start_up(self, start_wait: float): log.info("Waiting up to " + str(start_wait) + " seconds for LaunchDarkly client to initialize...") update_processor_ready.wait(start_wait) - if self._update_processor.initialized() is True: + if self.is_initialized() is True: log.info("Started LaunchDarkly Client: OK") else: log.warning("Initialization timeout exceeded for LaunchDarkly Client or an error occurred. " "Feature Flags may not yet be available.") @@ -379,7 +387,7 @@ def close(self): """ log.info("Closing LaunchDarkly client..") self._event_processor.stop() - self._update_processor.stop() + self._data_system.stop() self.__big_segment_store_manager.stop() # These magic methods allow a client object to be automatically cleaned up by the "with" scope operator @@ -464,7 +472,14 @@ def is_initialized(self) -> bool: unsuccessful attempt, or it might have received an unrecoverable error (such as an invalid SDK key) and given up. """ - return self.is_offline() or self._config.use_ldd or self._update_processor.initialized() + if self.is_offline() or self._config.use_ldd: + return True + + return ( + self._data_system._update_processor.initialized() + if self._data_system._update_processor + else False + ) def flush(self): """Flushes all pending analytics events. diff --git a/ldclient/impl/datasystem/__init__.py b/ldclient/impl/datasystem/__init__.py index 210fc7f7..9c5bf6d6 100644 --- a/ldclient/impl/datasystem/__init__.py +++ b/ldclient/impl/datasystem/__init__.py @@ -68,7 +68,7 @@ def start(self, set_on_ready: Event): Starts the data system. This method will return immediately. The provided `Event` will be set when the system - has reached an initial state (either permanently faile, e.g. due to bad auth, or + has reached an initial state (either permanently failed, e.g. due to bad auth, or succeeded) """ raise NotImplementedError diff --git a/ldclient/impl/datasystem/fdv1.py b/ldclient/impl/datasystem/fdv1.py new file mode 100644 index 00000000..d291aba3 --- /dev/null +++ b/ldclient/impl/datasystem/fdv1.py @@ -0,0 +1,171 @@ +from threading import Event +from typing import Optional + +from ldclient.config import Config +from ldclient.impl.datasource.feature_requester import FeatureRequesterImpl +from ldclient.impl.datasource.polling import PollingUpdateProcessor +from ldclient.impl.datasource.status import ( + DataSourceStatusProviderImpl, + DataSourceUpdateSinkImpl +) +from ldclient.impl.datasource.streaming import StreamingUpdateProcessor +from ldclient.impl.datastore.status import ( + DataStoreStatusProviderImpl, + DataStoreUpdateSinkImpl +) +from ldclient.impl.datasystem import DataAvailability +from ldclient.impl.flag_tracker import FlagTrackerImpl +from ldclient.impl.listeners import Listeners +from ldclient.impl.stubs import NullUpdateProcessor +from ldclient.interfaces import ( + DataSourceState, + DataSourceStatus, + DataSourceStatusProvider, + DataStoreStatusProvider, + FeatureStore, + FlagTracker, + UpdateProcessor +) + +# Delayed import inside __init__ to avoid circular dependency with ldclient.client + + +class FDv1: + """ + FDv1 wires the existing v1 data source and store behavior behind the + generic DataSystem surface. + """ + + def __init__(self, config: Config): + self._config = config + + # Set up data store plumbing + self._data_store_listeners = Listeners() + self._data_store_update_sink = DataStoreUpdateSinkImpl( + self._data_store_listeners + ) + # Import here to avoid circular import + from ldclient.client import _FeatureStoreClientWrapper + + self._store_wrapper: FeatureStore = _FeatureStoreClientWrapper( + self._config.feature_store, self._data_store_update_sink + ) + self._data_store_status_provider_impl = DataStoreStatusProviderImpl( + self._store_wrapper, self._data_store_update_sink + ) + + # Set up data source plumbing + self._data_source_listeners = Listeners() + self._flag_change_listeners = Listeners() + self._flag_tracker_impl = FlagTrackerImpl( + self._flag_change_listeners, + lambda key, context: None, # Replaced by client to use its evaluation method + ) + self._data_source_update_sink = DataSourceUpdateSinkImpl( + self._store_wrapper, + self._data_source_listeners, + self._flag_change_listeners, + ) + self._data_source_status_provider_impl = DataSourceStatusProviderImpl( + self._data_source_listeners, self._data_source_update_sink + ) + + # Ensure v1 processors can find the sink via config for status updates + self._config._data_source_update_sink = self._data_source_update_sink + + # Update processor created in start(), because it needs the ready Event + self._update_processor: Optional[UpdateProcessor] = None + + # Diagnostic accumulator provided by client for streaming metrics + self._diagnostic_accumulator = None + + # Track current data availability + self._data_availability: DataAvailability = ( + DataAvailability.CACHED + if getattr(self._store_wrapper, "initialized", False) + else DataAvailability.DEFAULTS + ) + + # React to data source status updates to adjust availability + def _on_status_change(status: DataSourceStatus): + if status.state == DataSourceState.VALID: + self._data_availability = DataAvailability.REFRESHED + + self._data_source_status_provider_impl.add_listener(_on_status_change) + + def start(self, set_on_ready: Event): + """ + Starts the v1 update processor and returns immediately. The provided + Event is set by the processor upon first successful initialization or + upon permanent failure. + """ + update_processor = self._make_update_processor( + self._config, self._store_wrapper, set_on_ready + ) + self._update_processor = update_processor + update_processor.start() + + def stop(self): + if self._update_processor is not None: + self._update_processor.stop() + + @property + def store(self) -> FeatureStore: + return self._store_wrapper + + def set_flag_value_eval_fn(self, eval_fn): + """ + Injects the flag value evaluation function used by the flag tracker to + compute FlagValueChange events. The function signature should be + (key: str, context: Context) -> Any. + """ + self._flag_tracker_impl = FlagTrackerImpl(self._flag_change_listeners, eval_fn) + + def set_diagnostic_accumulator(self, diagnostic_accumulator): + """ + Sets the diagnostic accumulator for streaming initialization metrics. + This should be called before start() to ensure metrics are collected. + """ + self._diagnostic_accumulator = diagnostic_accumulator + + @property + def data_source_status_provider(self) -> DataSourceStatusProvider: + return self._data_source_status_provider_impl + + @property + def data_store_status_provider(self) -> DataStoreStatusProvider: + return self._data_store_status_provider_impl + + @property + def flag_tracker(self) -> FlagTracker: + return self._flag_tracker_impl + + @property + def data_availability(self) -> DataAvailability: + return self._data_availability + + @property + def target_availability(self) -> DataAvailability: + if self._config.offline: + return DataAvailability.DEFAULTS + # In LDD mode or normal connected modes, the ideal is to be refreshed + return DataAvailability.REFRESHED + + def _make_update_processor(self, config: Config, store: FeatureStore, ready: Event): + # Mirrors LDClient._make_update_processor but scoped for FDv1 + if config.update_processor_class: + return config.update_processor_class(config, store, ready) + + if config.offline or config.use_ldd: + return NullUpdateProcessor(config, store, ready) + + if config.stream: + return StreamingUpdateProcessor(config, store, ready, self._diagnostic_accumulator) + + # Polling mode + feature_requester = ( + config.feature_requester_class(config) + if config.feature_requester_class is not None + else FeatureRequesterImpl(config) + ) + return PollingUpdateProcessor(config, feature_requester, store, ready) diff --git a/ldclient/testing/test_ldclient.py b/ldclient/testing/test_ldclient.py index 997312e1..6ce5442a 100644 --- a/ldclient/testing/test_ldclient.py +++ b/ldclient/testing/test_ldclient.py @@ -59,24 +59,24 @@ def count_events(c): def test_client_has_null_update_processor_in_offline_mode(): with make_offline_client() as client: - assert isinstance(client._update_processor, NullUpdateProcessor) + assert isinstance(client._data_system._update_processor, NullUpdateProcessor) def test_client_has_null_update_processor_in_ldd_mode(): with make_ldd_client() as client: - assert isinstance(client._update_processor, NullUpdateProcessor) + assert isinstance(client._data_system._update_processor, NullUpdateProcessor) def test_client_has_streaming_processor_by_default(): config = Config(sdk_key="secret", base_uri=unreachable_uri, stream_uri=unreachable_uri, send_events=False) with LDClient(config=config, start_wait=0) as client: - assert isinstance(client._update_processor, StreamingUpdateProcessor) + assert isinstance(client._data_system._update_processor, StreamingUpdateProcessor) def test_client_has_polling_processor_if_streaming_is_disabled(): config = Config(sdk_key="secret", stream=False, base_uri=unreachable_uri, stream_uri=unreachable_uri, send_events=False) with LDClient(config=config, start_wait=0) as client: - assert isinstance(client._update_processor, PollingUpdateProcessor) + assert isinstance(client._data_system._update_processor, PollingUpdateProcessor) def test_toggle_offline():