Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def __start_up(self, start_wait: float):

self._data_system: DataSystem = FDv1(self._config)
else:
self._data_system = FDv2(datasystem_config, disabled=self._config.offline)
self._data_system = FDv2(self._config, datasystem_config)

# Provide flag evaluation function for value-change tracking
self._data_system.set_flag_value_eval_fn( # type: ignore
Expand Down
2 changes: 1 addition & 1 deletion ldclient/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def disable_ssl_verification(self) -> bool:

T = TypeVar("T")

Builder = Callable[[], T]
Builder = Callable[['Config'], T]


@dataclass(frozen=True)
Expand Down
33 changes: 16 additions & 17 deletions ldclient/impl/datasystem/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

T = TypeVar("T")

Builder = Callable[[], T]
Builder = Callable[[LDConfig], T]


class ConfigBuilder: # pylint: disable=too-few-public-methods
Expand Down Expand Up @@ -77,8 +77,8 @@ def build(self) -> DataSystemConfig:
)


def __polling_ds_builder(config: LDConfig) -> Builder[PollingDataSource]:
def builder() -> PollingDataSource:
def __polling_ds_builder() -> Builder[PollingDataSource]:
def builder(config: LDConfig) -> PollingDataSource:
requester = Urllib3PollingRequester(config)
polling_ds = PollingDataSourceBuilder(config)
polling_ds.requester(requester)
Expand All @@ -88,14 +88,14 @@ def builder() -> PollingDataSource:
return builder


def __streaming_ds_builder(config: LDConfig) -> Builder[StreamingDataSource]:
def builder() -> StreamingDataSource:
def __streaming_ds_builder() -> Builder[StreamingDataSource]:
def builder(config: LDConfig) -> StreamingDataSource:
return StreamingDataSourceBuilder(config).build()

return builder


def default(config: LDConfig) -> ConfigBuilder:
def default() -> ConfigBuilder:
"""
Default is LaunchDarkly's recommended flag data acquisition strategy.

Expand All @@ -109,8 +109,8 @@ def default(config: LDConfig) -> ConfigBuilder:
for updates.
"""

polling_builder = __polling_ds_builder(config)
streaming_builder = __streaming_ds_builder(config)
polling_builder = __polling_ds_builder()
streaming_builder = __streaming_ds_builder()

builder = ConfigBuilder()
builder.initializers([polling_builder])
Expand All @@ -119,29 +119,29 @@ def default(config: LDConfig) -> ConfigBuilder:
return builder


def streaming(config: LDConfig) -> ConfigBuilder:
def streaming() -> ConfigBuilder:
"""
Streaming configures the SDK to efficiently streams flag/segment data
in the background, allowing evaluations to operate on the latest data
with no additional latency.
"""

streaming_builder = __streaming_ds_builder(config)
streaming_builder = __streaming_ds_builder()

builder = ConfigBuilder()
builder.synchronizers(streaming_builder)

return builder


def polling(config: LDConfig) -> ConfigBuilder:
def polling() -> ConfigBuilder:
"""
Polling configures the SDK to regularly poll an endpoint for
flag/segment data in the background. This is less efficient than
streaming, but may be necessary in some network environments.
"""

polling_builder: Builder[Synchronizer] = __polling_ds_builder(config)
polling_builder: Builder[Synchronizer] = __polling_ds_builder()

builder = ConfigBuilder()
builder.synchronizers(polling_builder)
Expand All @@ -160,25 +160,24 @@ def custom() -> ConfigBuilder:
return ConfigBuilder()


# TODO(fdv2): Need to update these so they don't rely on the LDConfig
def daemon(config: LDConfig, store: FeatureStore) -> ConfigBuilder:
def daemon(store: FeatureStore) -> ConfigBuilder:
"""
Daemon configures the SDK to read from a persistent store integration
that is populated by Relay Proxy or other SDKs. The SDK will not connect
to LaunchDarkly. In this mode, the SDK never writes to the data store.
"""
return default(config).data_store(store, DataStoreMode.READ_ONLY)
return default().data_store(store, DataStoreMode.READ_ONLY)


def persistent_store(config: LDConfig, store: FeatureStore) -> ConfigBuilder:
def persistent_store(store: FeatureStore) -> ConfigBuilder:
"""
PersistentStore is similar to Default, with the addition of a persistent
store integration. Before data has arrived from LaunchDarkly, the SDK is
able to evaluate flags using data from the persistent store. Once fresh
data is available, the SDK will no longer read from the persistent store,
although it will keep it up-to-date.
"""
return default(config).data_store(store, DataStoreMode.READ_WRITE)
return default().data_store(store, DataStoreMode.READ_WRITE)


# TODO(fdv2): Implement these methods
Expand Down
61 changes: 31 additions & 30 deletions ldclient/impl/datasystem/fdv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from threading import Event, Thread
from typing import Any, Callable, Dict, List, Mapping, Optional

from ldclient.config import Builder, DataSystemConfig
from ldclient.config import Builder, Config, DataSystemConfig
from ldclient.feature_store import _FeatureStoreDataSetSorter
from ldclient.impl.datasourcev2.status import (
DataSourceStatusProviderImpl,
Expand Down Expand Up @@ -153,8 +153,8 @@ class FDv2:

def __init__(
self,
config: DataSystemConfig,
disabled: bool = False,
config: Config,
data_system_config: DataSystemConfig,
):
"""
Initialize a new FDv2 data system.
Expand All @@ -165,10 +165,11 @@ def __init__(
:param disabled: Whether the data system is disabled (offline mode)
"""
self._config = config
self._primary_synchronizer_builder: Optional[Builder[Synchronizer]] = config.primary_synchronizer
self._secondary_synchronizer_builder = config.secondary_synchronizer
self._fdv1_fallback_synchronizer_builder = config.fdv1_fallback_synchronizer
self._disabled = disabled
self._data_system_config = data_system_config
self._primary_synchronizer_builder: Optional[Builder[Synchronizer]] = data_system_config.primary_synchronizer
self._secondary_synchronizer_builder = data_system_config.secondary_synchronizer
self._fdv1_fallback_synchronizer_builder = data_system_config.fdv1_fallback_synchronizer
self._disabled = self._config.offline

# Diagnostic accumulator provided by client for streaming metrics
# TODO(fdv2): Either we need to use this, or we need to provide it to
Expand All @@ -188,10 +189,10 @@ def __init__(
self._data_store_status_provider = DataStoreStatusProviderImpl(None, Listeners())

# Configure persistent store if provided
if self._config.data_store is not None:
self._data_store_status_provider = DataStoreStatusProviderImpl(self._config.data_store, Listeners())
writable = self._config.data_store_mode == DataStoreMode.READ_WRITE
wrapper = FeatureStoreClientWrapper(self._config.data_store, self._data_store_status_provider)
if self._data_system_config.data_store is not None:
self._data_store_status_provider = DataStoreStatusProviderImpl(self._data_system_config.data_store, Listeners())
writable = self._data_system_config.data_store_mode == DataStoreMode.READ_WRITE
wrapper = FeatureStoreClientWrapper(self._data_system_config.data_store, self._data_store_status_provider)
self._store.with_persistence(
wrapper, writable, self._data_store_status_provider
)
Expand All @@ -208,8 +209,8 @@ def __init__(

# Track configuration
self._configured_with_data_sources = (
(config.initializers is not None and len(config.initializers) > 0)
or config.primary_synchronizer is not None
(data_system_config.initializers is not None and len(data_system_config.initializers) > 0)
or data_system_config.primary_synchronizer is not None
)

def start(self, set_on_ready: Event):
Expand Down Expand Up @@ -268,32 +269,32 @@ def _run_main_loop(self, set_on_ready: Event):
self._run_synchronizers(set_on_ready)

except Exception as e:
log.error(f"Error in FDv2 main loop: {e}")
log.error("Error in FDv2 main loop: %s", e)
# Ensure ready event is set even on error
if not set_on_ready.is_set():
set_on_ready.set()

def _run_initializers(self, set_on_ready: Event):
"""Run initializers to get initial data."""
if self._config.initializers is None:
if self._data_system_config.initializers is None:
return

for initializer_builder in self._config.initializers:
for initializer_builder in self._data_system_config.initializers:
if self._stop_event.is_set():
return

try:
initializer = initializer_builder()
log.info(f"Attempting to initialize via {initializer.name}")
initializer = initializer_builder(self._config)
log.info("Attempting to initialize via %s", initializer.name)

basis_result = initializer.fetch()

if isinstance(basis_result, _Fail):
log.warning(f"Initializer {initializer.name} failed: {basis_result.error}")
log.warning("Initializer %s failed: %s", initializer.name, basis_result.error)
continue

basis = basis_result.value
log.info(f"Initialized via {initializer.name}")
log.info("Initialized via %s", initializer.name)

# Apply the basis to the store
self._store.apply(basis.change_set, basis.persist)
Expand All @@ -302,12 +303,12 @@ def _run_initializers(self, set_on_ready: Event):
if not set_on_ready.is_set():
set_on_ready.set()
except Exception as e:
log.error(f"Initializer failed with exception: {e}")
log.error("Initializer failed with exception: %s", e)

def _run_synchronizers(self, set_on_ready: Event):
"""Run synchronizers to keep data up-to-date."""
# If no primary synchronizer configured, just set ready and return
if self._config.primary_synchronizer is None:
if self._data_system_config.primary_synchronizer is None:
if not set_on_ready.is_set():
set_on_ready.set()
return
Expand All @@ -318,8 +319,8 @@ def synchronizer_loop(self: 'FDv2'):
while not self._stop_event.is_set() and self._primary_synchronizer_builder is not None:
# Try primary synchronizer
try:
primary_sync = self._primary_synchronizer_builder()
log.info(f"Primary synchronizer {primary_sync.name} is starting")
primary_sync = self._primary_synchronizer_builder(self._config)
log.info("Primary synchronizer %s is starting", primary_sync.name)

remove_sync, fallback_v1 = self._consume_synchronizer_results(
primary_sync, set_on_ready, self._fallback_condition
Expand All @@ -345,8 +346,8 @@ def synchronizer_loop(self: 'FDv2'):
if self._secondary_synchronizer_builder is None:
continue

secondary_sync = self._secondary_synchronizer_builder()
log.info(f"Secondary synchronizer {secondary_sync.name} is starting")
secondary_sync = self._secondary_synchronizer_builder(self._config)
log.info("Secondary synchronizer %s is starting", secondary_sync.name)

remove_sync, fallback_v1 = self._consume_synchronizer_results(
secondary_sync, set_on_ready, self._recovery_condition
Expand All @@ -368,11 +369,11 @@ def synchronizer_loop(self: 'FDv2'):

log.info("Recovery condition met, returning to primary synchronizer")
except Exception as e:
log.error(f"Failed to build primary synchronizer: {e}")
log.error("Failed to build primary synchronizer: %s", e)
break

except Exception as e:
log.error(f"Error in synchronizer loop: {e}")
log.error("Error in synchronizer loop: %s", e)
finally:
# Ensure we always set the ready event when exiting
if not set_on_ready.is_set():
Expand Down Expand Up @@ -400,7 +401,7 @@ def _consume_synchronizer_results(
"""
try:
for update in synchronizer.sync():
log.info(f"Synchronizer {synchronizer.name} update: {update.state}")
log.info("Synchronizer %s update: %s", synchronizer.name, update.state)
if self._stop_event.is_set():
return False, False

Expand All @@ -425,7 +426,7 @@ def _consume_synchronizer_results(
return False, False

except Exception as e:
log.error(f"Error consuming synchronizer results: {e}")
log.error("Error consuming synchronizer results: %s", e)
return True, False

return True, False
Expand Down
5 changes: 3 additions & 2 deletions ldclient/integrations/test_datav2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import copy
from typing import Any, Dict, List, Optional, Set, Union

from ldclient.config import Config
from ldclient.context import Context
from ldclient.impl.integrations.test_datav2.test_data_sourcev2 import (
_TestDataSourceV2
Expand Down Expand Up @@ -693,15 +694,15 @@ def _add_instance(self, instance):
finally:
self._lock.unlock()

def build_initializer(self) -> _TestDataSourceV2:
def build_initializer(self, _: Config) -> _TestDataSourceV2:
"""
Creates an initializer that can be used with the FDv2 data system.

:return: a test data initializer
"""
return _TestDataSourceV2(self)

def build_synchronizer(self) -> _TestDataSourceV2:
def build_synchronizer(self, _: Config) -> _TestDataSourceV2:
"""
Creates a synchronizer that can be used with the FDv2 data system.

Expand Down
12 changes: 3 additions & 9 deletions ldclient/testing/impl/datasystem/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ def test_custom_builder():

def test_default_config_builder():
"""Test that default() returns a properly configured ConfigBuilder."""
mock_ld_config = Mock(spec=LDConfig)

builder = default(mock_ld_config)
builder = default()

assert isinstance(builder, ConfigBuilder)
# The actual implementation details would be tested in integration tests
Expand All @@ -137,9 +135,7 @@ def test_default_config_builder():

def test_streaming_config_builder():
"""Test that streaming() returns a properly configured ConfigBuilder."""
mock_ld_config = Mock(spec=LDConfig)

builder = streaming(mock_ld_config)
builder = streaming()

assert isinstance(builder, ConfigBuilder)
# The actual implementation details would be tested in integration tests
Expand All @@ -148,9 +144,7 @@ def test_streaming_config_builder():

def test_polling_config_builder():
"""Test that polling() returns a properly configured ConfigBuilder."""
mock_ld_config = Mock(spec=LDConfig)

builder = polling(mock_ld_config)
builder = polling()

assert isinstance(builder, ConfigBuilder)
# The actual implementation details would be tested in integration tests
Expand Down
Loading