1+ import sys
12import json
23import logging
4+ from urllib .parse import urlparse
35
46import requests
57from big_segment_store_fixture import BigSegmentStoreFixture
2123 polling_ds_builder ,
2224 streaming_ds_builder
2325)
26+ from ldclient .feature_store import CacheConfig
2427from ldclient .impl .datasourcev2 .polling import PollingDataSourceBuilder
28+ from ldclient .integrations import Consul , DynamoDB , Redis
29+ from ldclient .interfaces import DataStoreMode
2530
2631
2732class ClientEntity :
@@ -102,6 +107,19 @@ def __init__(self, tag, config):
102107 if datasystem_config .get ("payloadFilter" ) is not None :
103108 opts ["payload_filter_key" ] = datasystem_config ["payloadFilter" ]
104109
110+ # Handle persistent data store configuration for dataSystem
111+ store_config = datasystem_config .get ("store" )
112+ if store_config is not None :
113+ persistent_store_config = store_config .get ("persistentDataStore" )
114+ if persistent_store_config is not None :
115+ store = _create_persistent_store (persistent_store_config )
116+
117+ # Parse store mode (0 = READ_ONLY, 1 = READ_WRITE)
118+ store_mode_value = datasystem_config .get ("storeMode" , 0 )
119+ store_mode = DataStoreMode .READ_WRITE if store_mode_value == 1 else DataStoreMode .READ_ONLY
120+
121+ datasystem .data_store (store , store_mode )
122+
105123 opts ["datasystem_config" ] = datasystem .build ()
106124
107125 elif config .get ("streaming" ) is not None :
@@ -111,14 +129,16 @@ def __init__(self, tag, config):
111129 if streaming .get ("filter" ) is not None :
112130 opts ["payload_filter_key" ] = streaming ["filter" ]
113131 _set_optional_time_prop (streaming , "initialRetryDelayMs" , opts , "initial_reconnect_delay" )
114- else :
132+ elif config . get ( "polling" ) is not None :
115133 opts ['stream' ] = False
116134 polling = config ["polling" ]
117135 if polling .get ("baseUri" ) is not None :
118136 opts ["base_uri" ] = polling ["baseUri" ]
119137 if polling .get ("filter" ) is not None :
120138 opts ["payload_filter_key" ] = polling ["filter" ]
121139 _set_optional_time_prop (polling , "pollIntervalMs" , opts , "poll_interval" )
140+ else :
141+ opts ['use_ldd' ] = True
122142
123143 if config .get ("events" ) is not None :
124144 events = config ["events" ]
@@ -148,6 +168,9 @@ def __init__(self, tag, config):
148168 _set_optional_time_prop (big_params , "staleAfterMs" , big_config , "stale_after" )
149169 opts ["big_segments" ] = BigSegmentsConfig (** big_config )
150170
171+ if config .get ("persistentDataStore" ) is not None :
172+ opts ["feature_store" ] = _create_persistent_store (config ["persistentDataStore" ])
173+
151174 start_wait = config .get ("startWaitTimeMs" ) or 5000
152175 config = Config (** opts )
153176
@@ -285,3 +308,72 @@ def _set_optional_time_prop(params_in: dict, name_in: str, params_out: dict, nam
285308 if params_in .get (name_in ) is not None :
286309 params_out [name_out ] = params_in [name_in ] / 1000.0
287310 return None
311+
312+
313+ def _create_persistent_store (persistent_store_config : dict ):
314+ """
315+ Creates a persistent store instance based on the configuration.
316+ Used for both v2 and v3 (dataSystem) configurations.
317+ """
318+ store_params = persistent_store_config ["store" ]
319+ store_type = store_params ["type" ]
320+ dsn = store_params ["dsn" ]
321+ prefix = store_params .get ("prefix" )
322+
323+ # Parse cache configuration
324+ cache_config = persistent_store_config .get ("cache" , {})
325+ cache_mode = cache_config .get ("mode" , "ttl" )
326+
327+ if cache_mode == "off" :
328+ caching = CacheConfig .disabled ()
329+ elif cache_mode == "infinite" :
330+ caching = CacheConfig (expiration = sys .maxsize )
331+ elif cache_mode == "ttl" :
332+ ttl_seconds = cache_config .get ("ttl" , 15 )
333+ caching = CacheConfig (expiration = ttl_seconds )
334+ else :
335+ caching = CacheConfig .default ()
336+
337+ # Create the appropriate store based on type
338+ if store_type == "redis" :
339+ return Redis .new_feature_store (
340+ url = dsn ,
341+ prefix = prefix or Redis .DEFAULT_PREFIX ,
342+ caching = caching
343+ )
344+ elif store_type == "dynamodb" :
345+ # Parse endpoint from DSN
346+ parsed = urlparse (dsn )
347+ endpoint_url = f"{ parsed .scheme } ://{ parsed .netloc } "
348+
349+ # Import boto3 for DynamoDB configuration
350+ import boto3
351+
352+ # Create DynamoDB client with test credentials
353+ dynamodb_opts = {
354+ 'endpoint_url' : endpoint_url ,
355+ 'region_name' : 'us-east-1' ,
356+ 'aws_access_key_id' : 'dummy' ,
357+ 'aws_secret_access_key' : 'dummy'
358+ }
359+
360+ return DynamoDB .new_feature_store (
361+ table_name = "sdk-contract-tests" ,
362+ prefix = prefix ,
363+ dynamodb_opts = dynamodb_opts ,
364+ caching = caching
365+ )
366+ elif store_type == "consul" :
367+ # Parse host and port from DSN
368+ parsed = urlparse (dsn ) if '://' in dsn else urlparse (f'http://{ dsn } ' )
369+ host = parsed .hostname or 'localhost'
370+ port = parsed .port or 8500
371+
372+ return Consul .new_feature_store (
373+ host = host ,
374+ port = port ,
375+ prefix = prefix ,
376+ caching = caching
377+ )
378+ else :
379+ raise ValueError (f"Unsupported data store type: { store_type } " )
0 commit comments