Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ jobs:

- name: Run tests
run: make test-all
env:
LD_SKIP_FLAKY_TESTS: true

- name: Verify typehints
run: make lint
Expand Down Expand Up @@ -92,3 +94,5 @@ jobs:

- name: Run tests
run: make test-all
env:
LD_SKIP_FLAKY_TESTS: true
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ test-all: install
.PHONY: lint
lint: #! Run type analysis and linting checks
lint: install
@mkdir -p .mypy_cache
@poetry run mypy ldclient
@poetry run isort --check --atomic ldclient contract-tests
@poetry run pycodestyle ldclient contract-tests
Expand Down
77 changes: 76 additions & 1 deletion contract-tests/client_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
Stage
)
from ldclient.config import BigSegmentsConfig
from ldclient.datasystem import (
custom,
fdv1_fallback_ds_builder,
polling_ds_builder,
streaming_ds_builder
)
from ldclient.impl.datasourcev2.polling import PollingDataSourceBuilder


class ClientEntity:
Expand All @@ -29,7 +36,75 @@ def __init__(self, tag, config):
'version': tags.get('applicationVersion', ''),
}

if config.get("streaming") is not None:
datasystem_config = config.get('dataSystem')
if datasystem_config is not None:
datasystem = custom()

init_configs = datasystem_config.get('initializers')
if init_configs is not None:
initializers = []
for init_config in init_configs:
polling = init_config.get('polling')
if polling is not None:
if polling.get("baseUri") is not None:
opts["base_uri"] = polling["baseUri"]
_set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval")
polling = polling_ds_builder()
initializers.append(polling)

datasystem.initializers(initializers)
sync_config = datasystem_config.get('synchronizers')
if sync_config is not None:
primary = sync_config.get('primary')
secondary = sync_config.get('secondary')

primary_builder = None
secondary_builder = None
fallback_builder = None

if primary is not None:
streaming = primary.get('streaming')
if streaming is not None:
primary_builder = streaming_ds_builder()
if streaming.get("baseUri") is not None:
opts["stream_uri"] = streaming["baseUri"]
_set_optional_time_prop(streaming, "initialRetryDelayMs", opts, "initial_reconnect_delay")
primary_builder = streaming_ds_builder()
elif primary.get('polling') is not None:
polling = primary.get('polling')
if polling.get("baseUri") is not None:
opts["base_uri"] = polling["baseUri"]
_set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval")
primary_builder = polling_ds_builder()
fallback_builder = fdv1_fallback_ds_builder()

if secondary is not None:
streaming = secondary.get('streaming')
if streaming is not None:
secondary_builder = streaming_ds_builder()
if streaming.get("baseUri") is not None:
opts["stream_uri"] = streaming["baseUri"]
_set_optional_time_prop(streaming, "initialRetryDelayMs", opts, "initial_reconnect_delay")
secondary_builder = streaming_ds_builder()
elif secondary.get('polling') is not None:
polling = secondary.get('polling')
if polling.get("baseUri") is not None:
opts["base_uri"] = polling["baseUri"]
_set_optional_time_prop(polling, "pollIntervalMs", opts, "poll_interval")
secondary_builder = polling_ds_builder()
fallback_builder = fdv1_fallback_ds_builder()

if primary_builder is not None:
datasystem.synchronizers(primary_builder, secondary_builder)
if fallback_builder is not None:
datasystem.fdv1_compatible_synchronizer(fallback_builder)

if datasystem_config.get("payloadFilter") is not None:
opts["payload_filter_key"] = datasystem_config["payloadFilter"]

opts["datasystem_config"] = datasystem.build()

elif config.get("streaming") is not None:
streaming = config["streaming"]
if streaming.get("baseUri") is not None:
opts["stream_uri"] = streaming["baseUri"]
Expand Down
43 changes: 23 additions & 20 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
DataStoreStatusProviderImpl,
DataStoreUpdateSinkImpl
)
from ldclient.impl.datasystem import DataAvailability, DataSystem
from ldclient.impl.datasystem.fdv2 import FDv2
from ldclient.impl.evaluator import Evaluator, error_reason
from ldclient.impl.events.diagnostics import (
_DiagnosticAccumulator,
Expand All @@ -51,7 +53,8 @@
DataStoreStatusProvider,
DataStoreUpdateSink,
FeatureStore,
FlagTracker
FlagTracker,
ReadOnlyStore
)
from ldclient.migrations import OpTracker, Stage
from ldclient.plugin import (
Expand Down Expand Up @@ -249,14 +252,19 @@ def __start_up(self, start_wait: float):
self.__hooks_lock = ReadWriteLock()
self.__hooks = self._config.hooks + plugin_hooks # type: List[Hook]

# Initialize data system (FDv1) to encapsulate v1 data plumbing
from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency
FDv1
)
datasystem_config = self._config.datasystem_config
if datasystem_config is None:
# Initialize data system (FDv1) to encapsulate v1 data plumbing
from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency
FDv1
)

self._data_system: DataSystem = FDv1(self._config)
else:
self._data_system = FDv2(self._config, datasystem_config)

self._data_system = FDv1(self._config)
# Provide flag evaluation function for value-change tracking
self._data_system.set_flag_value_eval_fn(
self._data_system.set_flag_value_eval_fn( # type: ignore
lambda key, context: self.variation(key, context, None)
)
# Expose providers and store from data system
Expand All @@ -265,14 +273,13 @@ def __start_up(self, start_wait: float):
self._data_system.data_source_status_provider
)
self.__flag_tracker = self._data_system.flag_tracker
self._store = self._data_system.store # type: FeatureStore

big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments)
self.__big_segment_store_manager = big_segment_store_manager

self._evaluator = Evaluator(
lambda key: _get_store_item(self._store, FEATURES, key),
lambda key: _get_store_item(self._store, SEGMENTS, key),
lambda key: _get_store_item(self._data_system.store, FEATURES, key),
lambda key: _get_store_item(self._data_system.store, SEGMENTS, key),
lambda key: big_segment_store_manager.get_user_membership(key),
log,
)
Expand All @@ -286,7 +293,7 @@ def __start_up(self, start_wait: float):
diagnostic_accumulator = self._set_event_processor(self._config)

# Pass diagnostic accumulator to data system for streaming metrics
self._data_system.set_diagnostic_accumulator(diagnostic_accumulator)
self._data_system.set_diagnostic_accumulator(diagnostic_accumulator) # type: ignore

self.__register_plugins(environment_metadata)

Expand Down Expand Up @@ -475,11 +482,7 @@ def is_initialized(self) -> bool:
if self.is_offline() or self._config.use_ldd:
return True

return (
self._data_system._update_processor.initialized()
if self._data_system._update_processor
else False
)
return self._data_system.data_availability.at_least(DataAvailability.CACHED)

def flush(self):
"""Flushes all pending analytics events.
Expand Down Expand Up @@ -567,7 +570,7 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac
return EvaluationDetail(default, None, error_reason('CLIENT_NOT_READY')), None

if not self.is_initialized():
if self._store.initialized:
if self._data_system.store.initialized:
log.warning("Feature Flag evaluation attempted before client has initialized - using last known values from feature store for feature key: " + key)
else:
log.warning("Feature Flag evaluation attempted before client has initialized! Feature store unavailable - returning default: " + str(default) + " for feature key: " + key)
Expand All @@ -580,7 +583,7 @@ def _evaluate_internal(self, key: str, context: Context, default: Any, event_fac
return EvaluationDetail(default, None, error_reason('USER_NOT_SPECIFIED')), None

try:
flag = _get_store_item(self._store, FEATURES, key)
flag = _get_store_item(self._data_system.store, FEATURES, key)
except Exception as e:
log.error("Unexpected error while retrieving feature flag \"%s\": %s" % (key, repr(e)))
log.debug(traceback.format_exc())
Expand Down Expand Up @@ -638,7 +641,7 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState:
return FeatureFlagsState(False)

if not self.is_initialized():
if self._store.initialized:
if self._data_system.store.initialized:
log.warning("all_flags_state() called before client has finished initializing! Using last known values from feature store")
else:
log.warning("all_flags_state() called before client has finished initializing! Feature store unavailable - returning empty state")
Expand All @@ -653,7 +656,7 @@ def all_flags_state(self, context: Context, **kwargs) -> FeatureFlagsState:
with_reasons = kwargs.get('with_reasons', False)
details_only_if_tracked = kwargs.get('details_only_for_tracked_flags', False)
try:
flags_map = self._store.all(FEATURES, lambda x: x)
flags_map = self._data_system.store.all(FEATURES, lambda x: x)
if flags_map is None:
raise ValueError("feature store error")
except Exception as e:
Expand Down
46 changes: 45 additions & 1 deletion ldclient/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
Note that the same class can also be imported from the ``ldclient.client`` submodule.
"""

from dataclasses import dataclass
from threading import Event
from typing import Callable, List, Optional, Set
from typing import Callable, List, Optional, Set, TypeVar

from ldclient.feature_store import InMemoryFeatureStore
from ldclient.hook import Hook
Expand All @@ -17,8 +18,11 @@
from ldclient.interfaces import (
BigSegmentStore,
DataSourceUpdateSink,
DataStoreMode,
EventProcessor,
FeatureStore,
Initializer,
Synchronizer,
UpdateProcessor
)
from ldclient.plugin import Plugin
Expand Down Expand Up @@ -152,6 +156,34 @@ def disable_ssl_verification(self) -> bool:
return self.__disable_ssl_verification


T = TypeVar("T")

Builder = Callable[['Config'], T]


@dataclass(frozen=True)
class DataSystemConfig:
"""Configuration for LaunchDarkly's data acquisition strategy."""

initializers: Optional[List[Builder[Initializer]]]
"""The initializers for the data system."""

primary_synchronizer: Optional[Builder[Synchronizer]]
"""The primary synchronizer for the data system."""

secondary_synchronizer: Optional[Builder[Synchronizer]] = None
"""The secondary synchronizers for the data system."""

data_store_mode: DataStoreMode = DataStoreMode.READ_WRITE
"""The data store mode specifies the mode in which the persistent store will operate, if present."""

data_store: Optional[FeatureStore] = None
"""The (optional) persistent data store instance."""

fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None
"""An optional fallback synchronizer that will read from FDv1"""


class Config:
"""Advanced configuration options for the SDK client.

Expand Down Expand Up @@ -194,6 +226,7 @@ def __init__(
enable_event_compression: bool = False,
omit_anonymous_contexts: bool = False,
payload_filter_key: Optional[str] = None,
datasystem_config: Optional[DataSystemConfig] = None,
):
"""
:param sdk_key: The SDK key for your LaunchDarkly account. This is always required.
Expand Down Expand Up @@ -264,6 +297,7 @@ def __init__(
:param enable_event_compression: Whether or not to enable GZIP compression for outgoing events.
:param omit_anonymous_contexts: Sets whether anonymous contexts should be omitted from index and identify events.
:param payload_filter_key: The payload filter is used to selectively limited the flags and segments delivered in the data source payload.
:param datasystem_config: Configuration for the upcoming enhanced data system design. This is experimental and should not be set without direction from LaunchDarkly support.
"""
self.__sdk_key = validate_sdk_key_format(sdk_key, log)

Expand Down Expand Up @@ -303,6 +337,7 @@ def __init__(
self.__payload_filter_key = payload_filter_key
self._data_source_update_sink: Optional[DataSourceUpdateSink] = None
self._instance_id: Optional[str] = None
self._datasystem_config = datasystem_config

def copy_with_new_sdk_key(self, new_sdk_key: str) -> 'Config':
"""Returns a new ``Config`` instance that is the same as this one, except for having a different SDK key.
Expand Down Expand Up @@ -546,6 +581,15 @@ def data_source_update_sink(self) -> Optional[DataSourceUpdateSink]:
"""
return self._data_source_update_sink

@property
def datasystem_config(self) -> Optional[DataSystemConfig]:
"""
Configuration for the upcoming enhanced data system design. This is
experimental and should not be set without direction from LaunchDarkly
support.
"""
return self._datasystem_config

def _validate(self):
if self.offline is False and self.sdk_key == '':
log.warning("Missing or blank SDK key")
Expand Down
Loading