Skip to content

Commit c22ad26

Browse files
authored
chore: Refactor FDv1 behind the datasystem interface (#347)
1 parent 9a21a7a commit c22ad26

File tree

4 files changed

+210
-24
lines changed

4 files changed

+210
-24
lines changed

ldclient/client.py

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -249,26 +249,32 @@ def __start_up(self, start_wait: float):
249249
self.__hooks_lock = ReadWriteLock()
250250
self.__hooks = self._config.hooks + plugin_hooks # type: List[Hook]
251251

252-
data_store_listeners = Listeners()
253-
store_sink = DataStoreUpdateSinkImpl(data_store_listeners)
254-
store = _FeatureStoreClientWrapper(self._config.feature_store, store_sink)
255-
256-
self.__data_store_status_provider = DataStoreStatusProviderImpl(store, store_sink)
257-
258-
data_source_listeners = Listeners()
259-
flag_change_listeners = Listeners()
260-
261-
self.__flag_tracker = FlagTrackerImpl(flag_change_listeners, lambda key, context: self.variation(key, context, None))
252+
# Initialize data system (FDv1) to encapsulate v1 data plumbing
253+
from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency
254+
FDv1
255+
)
262256

263-
self._config._data_source_update_sink = DataSourceUpdateSinkImpl(store, data_source_listeners, flag_change_listeners)
264-
self.__data_source_status_provider = DataSourceStatusProviderImpl(data_source_listeners, self._config._data_source_update_sink)
265-
self._store = store # type: FeatureStore
257+
self._data_system = FDv1(self._config)
258+
# Provide flag evaluation function for value-change tracking
259+
self._data_system.set_flag_value_eval_fn(
260+
lambda key, context: self.variation(key, context, None)
261+
)
262+
# Expose providers and store from data system
263+
self.__data_store_status_provider = self._data_system.data_store_status_provider
264+
self.__data_source_status_provider = (
265+
self._data_system.data_source_status_provider
266+
)
267+
self.__flag_tracker = self._data_system.flag_tracker
268+
self._store = self._data_system.store # type: FeatureStore
266269

267270
big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments)
268271
self.__big_segment_store_manager = big_segment_store_manager
269272

270273
self._evaluator = Evaluator(
271-
lambda key: _get_store_item(store, FEATURES, key), lambda key: _get_store_item(store, SEGMENTS, key), lambda key: big_segment_store_manager.get_user_membership(key), log
274+
lambda key: _get_store_item(self._store, FEATURES, key),
275+
lambda key: _get_store_item(self._store, SEGMENTS, key),
276+
lambda key: big_segment_store_manager.get_user_membership(key),
277+
log,
272278
)
273279

274280
if self._config.offline:
@@ -279,11 +285,13 @@ def __start_up(self, start_wait: float):
279285

280286
diagnostic_accumulator = self._set_event_processor(self._config)
281287

288+
# Pass diagnostic accumulator to data system for streaming metrics
289+
self._data_system.set_diagnostic_accumulator(diagnostic_accumulator)
290+
282291
self.__register_plugins(environment_metadata)
283292

284293
update_processor_ready = threading.Event()
285-
self._update_processor = self._make_update_processor(self._config, self._store, update_processor_ready, diagnostic_accumulator)
286-
self._update_processor.start()
294+
self._data_system.start(update_processor_ready)
287295

288296
if not self._config.offline and not self._config.use_ldd:
289297
if start_wait > 60:
@@ -293,7 +301,7 @@ def __start_up(self, start_wait: float):
293301
log.info("Waiting up to " + str(start_wait) + " seconds for LaunchDarkly client to initialize...")
294302
update_processor_ready.wait(start_wait)
295303

296-
if self._update_processor.initialized() is True:
304+
if self.is_initialized() is True:
297305
log.info("Started LaunchDarkly Client: OK")
298306
else:
299307
log.warning("Initialization timeout exceeded for LaunchDarkly Client or an error occurred. " "Feature Flags may not yet be available.")
@@ -379,7 +387,7 @@ def close(self):
379387
"""
380388
log.info("Closing LaunchDarkly client..")
381389
self._event_processor.stop()
382-
self._update_processor.stop()
390+
self._data_system.stop()
383391
self.__big_segment_store_manager.stop()
384392

385393
# These magic methods allow a client object to be automatically cleaned up by the "with" scope operator
@@ -464,7 +472,14 @@ def is_initialized(self) -> bool:
464472
unsuccessful attempt, or it might have received an unrecoverable error (such as an invalid SDK key)
465473
and given up.
466474
"""
467-
return self.is_offline() or self._config.use_ldd or self._update_processor.initialized()
475+
if self.is_offline() or self._config.use_ldd:
476+
return True
477+
478+
return (
479+
self._data_system._update_processor.initialized()
480+
if self._data_system._update_processor
481+
else False
482+
)
468483

469484
def flush(self):
470485
"""Flushes all pending analytics events.

ldclient/impl/datasystem/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def start(self, set_on_ready: Event):
6868
Starts the data system.
6969
7070
This method will return immediately. The provided `Event` will be set when the system
71-
has reached an initial state (either permanently faile, e.g. due to bad auth, or
71+
has reached an initial state (either permanently failed, e.g. due to bad auth, or
7272
succeeded)
7373
"""
7474
raise NotImplementedError

ldclient/impl/datasystem/fdv1.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
from threading import Event
2+
from typing import Optional
3+
4+
from ldclient.config import Config
5+
from ldclient.impl.datasource.feature_requester import FeatureRequesterImpl
6+
from ldclient.impl.datasource.polling import PollingUpdateProcessor
7+
from ldclient.impl.datasource.status import (
8+
DataSourceStatusProviderImpl,
9+
DataSourceUpdateSinkImpl
10+
)
11+
from ldclient.impl.datasource.streaming import StreamingUpdateProcessor
12+
from ldclient.impl.datastore.status import (
13+
DataStoreStatusProviderImpl,
14+
DataStoreUpdateSinkImpl
15+
)
16+
from ldclient.impl.datasystem import DataAvailability
17+
from ldclient.impl.flag_tracker import FlagTrackerImpl
18+
from ldclient.impl.listeners import Listeners
19+
from ldclient.impl.stubs import NullUpdateProcessor
20+
from ldclient.interfaces import (
21+
DataSourceState,
22+
DataSourceStatus,
23+
DataSourceStatusProvider,
24+
DataStoreStatusProvider,
25+
FeatureStore,
26+
FlagTracker,
27+
UpdateProcessor
28+
)
29+
30+
# Delayed import inside __init__ to avoid circular dependency with ldclient.client
31+
32+
33+
class FDv1:
34+
"""
35+
FDv1 wires the existing v1 data source and store behavior behind the
36+
generic DataSystem surface.
37+
"""
38+
39+
def __init__(self, config: Config):
40+
self._config = config
41+
42+
# Set up data store plumbing
43+
self._data_store_listeners = Listeners()
44+
self._data_store_update_sink = DataStoreUpdateSinkImpl(
45+
self._data_store_listeners
46+
)
47+
# Import here to avoid circular import
48+
from ldclient.client import _FeatureStoreClientWrapper
49+
50+
self._store_wrapper: FeatureStore = _FeatureStoreClientWrapper(
51+
self._config.feature_store, self._data_store_update_sink
52+
)
53+
self._data_store_status_provider_impl = DataStoreStatusProviderImpl(
54+
self._store_wrapper, self._data_store_update_sink
55+
)
56+
57+
# Set up data source plumbing
58+
self._data_source_listeners = Listeners()
59+
self._flag_change_listeners = Listeners()
60+
self._flag_tracker_impl = FlagTrackerImpl(
61+
self._flag_change_listeners,
62+
lambda key, context: None, # Replaced by client to use its evaluation method
63+
)
64+
self._data_source_update_sink = DataSourceUpdateSinkImpl(
65+
self._store_wrapper,
66+
self._data_source_listeners,
67+
self._flag_change_listeners,
68+
)
69+
self._data_source_status_provider_impl = DataSourceStatusProviderImpl(
70+
self._data_source_listeners, self._data_source_update_sink
71+
)
72+
73+
# Ensure v1 processors can find the sink via config for status updates
74+
self._config._data_source_update_sink = self._data_source_update_sink
75+
76+
# Update processor created in start(), because it needs the ready Event
77+
self._update_processor: Optional[UpdateProcessor] = None
78+
79+
# Diagnostic accumulator provided by client for streaming metrics
80+
self._diagnostic_accumulator = None
81+
82+
# Track current data availability
83+
self._data_availability: DataAvailability = (
84+
DataAvailability.CACHED
85+
if getattr(self._store_wrapper, "initialized", False)
86+
else DataAvailability.DEFAULTS
87+
)
88+
89+
# React to data source status updates to adjust availability
90+
def _on_status_change(status: DataSourceStatus):
91+
if status.state == DataSourceState.VALID:
92+
self._data_availability = DataAvailability.REFRESHED
93+
94+
self._data_source_status_provider_impl.add_listener(_on_status_change)
95+
96+
def start(self, set_on_ready: Event):
97+
"""
98+
Starts the v1 update processor and returns immediately. The provided
99+
Event is set by the processor upon first successful initialization or
100+
upon permanent failure.
101+
"""
102+
update_processor = self._make_update_processor(
103+
self._config, self._store_wrapper, set_on_ready
104+
)
105+
self._update_processor = update_processor
106+
update_processor.start()
107+
108+
def stop(self):
109+
if self._update_processor is not None:
110+
self._update_processor.stop()
111+
112+
@property
113+
def store(self) -> FeatureStore:
114+
return self._store_wrapper
115+
116+
def set_flag_value_eval_fn(self, eval_fn):
117+
"""
118+
Injects the flag value evaluation function used by the flag tracker to
119+
compute FlagValueChange events. The function signature should be
120+
(key: str, context: Context) -> Any.
121+
"""
122+
self._flag_tracker_impl = FlagTrackerImpl(self._flag_change_listeners, eval_fn)
123+
124+
def set_diagnostic_accumulator(self, diagnostic_accumulator):
125+
"""
126+
Sets the diagnostic accumulator for streaming initialization metrics.
127+
This should be called before start() to ensure metrics are collected.
128+
"""
129+
self._diagnostic_accumulator = diagnostic_accumulator
130+
131+
@property
132+
def data_source_status_provider(self) -> DataSourceStatusProvider:
133+
return self._data_source_status_provider_impl
134+
135+
@property
136+
def data_store_status_provider(self) -> DataStoreStatusProvider:
137+
return self._data_store_status_provider_impl
138+
139+
@property
140+
def flag_tracker(self) -> FlagTracker:
141+
return self._flag_tracker_impl
142+
143+
@property
144+
def data_availability(self) -> DataAvailability:
145+
return self._data_availability
146+
147+
@property
148+
def target_availability(self) -> DataAvailability:
149+
if self._config.offline:
150+
return DataAvailability.DEFAULTS
151+
# In LDD mode or normal connected modes, the ideal is to be refreshed
152+
return DataAvailability.REFRESHED
153+
154+
def _make_update_processor(self, config: Config, store: FeatureStore, ready: Event):
155+
# Mirrors LDClient._make_update_processor but scoped for FDv1
156+
if config.update_processor_class:
157+
return config.update_processor_class(config, store, ready)
158+
159+
if config.offline or config.use_ldd:
160+
return NullUpdateProcessor(config, store, ready)
161+
162+
if config.stream:
163+
return StreamingUpdateProcessor(config, store, ready, self._diagnostic_accumulator)
164+
165+
# Polling mode
166+
feature_requester = (
167+
config.feature_requester_class(config)
168+
if config.feature_requester_class is not None
169+
else FeatureRequesterImpl(config)
170+
)
171+
return PollingUpdateProcessor(config, feature_requester, store, ready)

ldclient/testing/test_ldclient.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,24 +59,24 @@ def count_events(c):
5959

6060
def test_client_has_null_update_processor_in_offline_mode():
6161
with make_offline_client() as client:
62-
assert isinstance(client._update_processor, NullUpdateProcessor)
62+
assert isinstance(client._data_system._update_processor, NullUpdateProcessor)
6363

6464

6565
def test_client_has_null_update_processor_in_ldd_mode():
6666
with make_ldd_client() as client:
67-
assert isinstance(client._update_processor, NullUpdateProcessor)
67+
assert isinstance(client._data_system._update_processor, NullUpdateProcessor)
6868

6969

7070
def test_client_has_streaming_processor_by_default():
7171
config = Config(sdk_key="secret", base_uri=unreachable_uri, stream_uri=unreachable_uri, send_events=False)
7272
with LDClient(config=config, start_wait=0) as client:
73-
assert isinstance(client._update_processor, StreamingUpdateProcessor)
73+
assert isinstance(client._data_system._update_processor, StreamingUpdateProcessor)
7474

7575

7676
def test_client_has_polling_processor_if_streaming_is_disabled():
7777
config = Config(sdk_key="secret", stream=False, base_uri=unreachable_uri, stream_uri=unreachable_uri, send_events=False)
7878
with LDClient(config=config, start_wait=0) as client:
79-
assert isinstance(client._update_processor, PollingUpdateProcessor)
79+
assert isinstance(client._data_system._update_processor, PollingUpdateProcessor)
8080

8181

8282
def test_toggle_offline():

0 commit comments

Comments
 (0)