Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
DataStoreStatusProviderImpl,
DataStoreUpdateSinkImpl
)
from ldclient.impl.datasystem import DataAvailability, DataSystem
from ldclient.impl.datasystem.fdv2 import FDv2
from ldclient.impl.evaluator import Evaluator, error_reason
from ldclient.impl.events.diagnostics import (
_DiagnosticAccumulator,
Expand Down Expand Up @@ -249,14 +251,19 @@ def __start_up(self, start_wait: float):
self.__hooks_lock = ReadWriteLock()
self.__hooks = self._config.hooks + plugin_hooks # type: List[Hook]

# Initialize data system (FDv1) to encapsulate v1 data plumbing
from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency
FDv1
)
datasystem_config = self._config.datasystem_config
if datasystem_config is None:
# Initialize data system (FDv1) to encapsulate v1 data plumbing
from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency
FDv1
)

self._data_system: DataSystem = FDv1(self._config)
else:
self._data_system = FDv2(datasystem_config, disabled=self._config.offline)

self._data_system = FDv1(self._config)
# Provide flag evaluation function for value-change tracking
self._data_system.set_flag_value_eval_fn(
self._data_system.set_flag_value_eval_fn( # type: ignore
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These ignores are because the data system protocol doesn't expose these methods, but both fdv1 and fdv2 implementations do. So we have to tell the linter to trust us in a few places.

lambda key, context: self.variation(key, context, None)
)
# Expose providers and store from data system
Expand All @@ -265,7 +272,7 @@ def __start_up(self, start_wait: float):
self._data_system.data_source_status_provider
)
self.__flag_tracker = self._data_system.flag_tracker
self._store = self._data_system.store # type: FeatureStore
self._store: FeatureStore = self._data_system.store # type: ignore

big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments)
self.__big_segment_store_manager = big_segment_store_manager
Expand All @@ -286,7 +293,7 @@ def __start_up(self, start_wait: float):
diagnostic_accumulator = self._set_event_processor(self._config)

# Pass diagnostic accumulator to data system for streaming metrics
self._data_system.set_diagnostic_accumulator(diagnostic_accumulator)
self._data_system.set_diagnostic_accumulator(diagnostic_accumulator) # type: ignore

self.__register_plugins(environment_metadata)

Expand Down Expand Up @@ -475,11 +482,7 @@ def is_initialized(self) -> bool:
if self.is_offline() or self._config.use_ldd:
return True

return (
self._data_system._update_processor.initialized()
if self._data_system._update_processor
else False
)
return self._data_system.data_availability.at_least(DataAvailability.CACHED)

def flush(self):
"""Flushes all pending analytics events.
Expand Down
42 changes: 41 additions & 1 deletion ldclient/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
Note that the same class can also be imported from the ``ldclient.client`` submodule.
"""

from dataclasses import dataclass
from threading import Event
from typing import Callable, List, Optional, Set
from typing import Callable, List, Optional, Set, TypeVar

from ldclient.feature_store import InMemoryFeatureStore
from ldclient.hook import Hook
from ldclient.impl.datasystem import Initializer, Synchronizer
from ldclient.impl.util import (
log,
validate_application_info,
Expand Down Expand Up @@ -152,6 +154,32 @@ def disable_ssl_verification(self) -> bool:
return self.__disable_ssl_verification


T = TypeVar("T")

Builder = Callable[[], T]


@dataclass(frozen=True)
class DataSystemConfig:
"""
Configuration for LaunchDarkly's data acquisition strategy.
"""

initializers: Optional[List[Builder[Initializer]]]
"""The initializers for the data system."""

primary_synchronizer: Builder[Synchronizer]
"""The primary synchronizer for the data system."""

secondary_synchronizer: Optional[Builder[Synchronizer]] = None
"""The secondary synchronizers for the data system."""

# TODO(fdv2): Implement this synchronizer up and hook it up everywhere.
# TODO(fdv2): Remove this when FDv2 is fully launched
fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None
"""An optional fallback synchronizer that will read from FDv1"""


class Config:
"""Advanced configuration options for the SDK client.

Expand Down Expand Up @@ -194,6 +222,7 @@ def __init__(
enable_event_compression: bool = False,
omit_anonymous_contexts: bool = False,
payload_filter_key: Optional[str] = None,
datasystem_config: Optional[DataSystemConfig] = None,
):
"""
:param sdk_key: The SDK key for your LaunchDarkly account. This is always required.
Expand Down Expand Up @@ -264,6 +293,7 @@ def __init__(
:param enable_event_compression: Whether or not to enable GZIP compression for outgoing events.
:param omit_anonymous_contexts: Sets whether anonymous contexts should be omitted from index and identify events.
:param payload_filter_key: The payload filter is used to selectively limited the flags and segments delivered in the data source payload.
:param datasystem_config: Configuration for the upcoming enhanced data system design. This is experimental and should not be set without direction from LaunchDarkly support.
"""
self.__sdk_key = validate_sdk_key_format(sdk_key, log)

Expand Down Expand Up @@ -303,6 +333,7 @@ def __init__(
self.__payload_filter_key = payload_filter_key
self._data_source_update_sink: Optional[DataSourceUpdateSink] = None
self._instance_id: Optional[str] = None
self._datasystem_config = datasystem_config

def copy_with_new_sdk_key(self, new_sdk_key: str) -> 'Config':
"""Returns a new ``Config`` instance that is the same as this one, except for having a different SDK key.
Expand Down Expand Up @@ -546,6 +577,15 @@ def data_source_update_sink(self) -> Optional[DataSourceUpdateSink]:
"""
return self._data_source_update_sink

@property
def datasystem_config(self) -> Optional[DataSystemConfig]:
"""
Configuration for the upcoming enhanced data system design. This is
experimental and should not be set without direction from LaunchDarkly
support.
"""
return self._datasystem_config

def _validate(self):
if self.offline is False and self.sdk_key == '':
log.warning("Missing or blank SDK key")
Expand Down
1 change: 1 addition & 0 deletions ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(
"ldclient.datasource.polling", poll_interval, 0, self._poll
)

@property
def name(self) -> str:
"""Returns the name of the initializer."""
return "PollingDataSourceV2"
Expand Down
57 changes: 57 additions & 0 deletions ldclient/impl/datasourcev2/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import time
from typing import Callable, Optional

from ldclient.impl.listeners import Listeners
from ldclient.impl.rwlock import ReadWriteLock
from ldclient.interfaces import (
DataSourceErrorInfo,
DataSourceState,
DataSourceStatus,
DataSourceStatusProvider
)


class DataSourceStatusProviderImpl(DataSourceStatusProvider):
def __init__(self, listeners: Listeners):
self.__listeners = listeners
self.__status = DataSourceStatus(DataSourceState.INITIALIZING, 0, None)
self.__lock = ReadWriteLock()

@property
def status(self) -> DataSourceStatus:
self.__lock.rlock()
status = self.__status
self.__lock.runlock()

return status

def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]):
status_to_broadcast = None

try:
self.__lock.lock()
old_status = self.__status

if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING:
new_state = DataSourceState.INITIALIZING

if new_state == old_status.state and new_error is None:
return

new_since = self.__status.since if new_state == self.__status.state else time.time()
new_error = self.__status.error if new_error is None else new_error

self.__status = DataSourceStatus(new_state, new_since, new_error)

status_to_broadcast = self.__status
finally:
self.__lock.unlock()

if status_to_broadcast is not None:
self.__listeners.notify(status_to_broadcast)

def add_listener(self, listener: Callable[[DataSourceStatus], None]):
self.__listeners.add(listener)

def remove_listener(self, listener: Callable[[DataSourceStatus], None]):
self.__listeners.remove(listener)
7 changes: 7 additions & 0 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ def __init__(
self._config = config
self._sse: Optional[SSEClient] = None

@property
def name(self) -> str:
"""
Returns the name of the synchronizer, which is used for logging and debugging.
"""
return "streaming"

def sync(self) -> Generator[Update, None, None]:
"""
sync should begin the synchronization process for the data source, yielding
Expand Down
2 changes: 1 addition & 1 deletion ldclient/impl/datastore/status.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from copy import copy
from typing import TYPE_CHECKING, Callable
from typing import TYPE_CHECKING, Callable, Protocol

from ldclient.impl.listeners import Listeners
from ldclient.impl.rwlock import ReadWriteLock
Expand Down
15 changes: 15 additions & 0 deletions ldclient/impl/datasystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ class Initializer(Protocol): # pylint: disable=too-few-public-methods
as new changes occur.
"""

@property
@abstractmethod
def name(self) -> str:
"""
Returns the name of the initializer, which is used for logging and debugging.
"""
raise NotImplementedError

@abstractmethod
def fetch(self) -> BasisResult:
"""
Expand Down Expand Up @@ -188,6 +196,13 @@ class Synchronizer(Protocol): # pylint: disable=too-few-public-methods
of the data source, including any changes that have occurred since the last
synchronization.
"""
@property
@abstractmethod
def name(self) -> str:
"""
Returns the name of the synchronizer, which is used for logging and debugging.
"""
raise NotImplementedError

@abstractmethod
def sync(self) -> Generator[Update, None, None]:
Expand Down
26 changes: 5 additions & 21 deletions ldclient/impl/datasystem/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
Configuration for LaunchDarkly's data acquisition strategy.
"""

from dataclasses import dataclass
from typing import Callable, List, Optional, TypeVar

from ldclient.config import Config as LDConfig
from ldclient.config import DataSystemConfig
from ldclient.impl.datasourcev2.polling import (
PollingDataSource,
PollingDataSourceBuilder,
Expand All @@ -22,22 +22,6 @@
Builder = Callable[[], T]


@dataclass(frozen=True)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved.

class Config:
"""
Configuration for LaunchDarkly's data acquisition strategy.
"""

initializers: Optional[List[Builder[Initializer]]]
"""The initializers for the data system."""

primary_synchronizer: Builder[Synchronizer]
"""The primary synchronizer for the data system."""

secondary_synchronizer: Optional[Builder[Synchronizer]]
"""The secondary synchronizers for the data system."""


class ConfigBuilder: # pylint: disable=too-few-public-methods
"""
Builder for the data system configuration.
Expand All @@ -47,7 +31,7 @@ class ConfigBuilder: # pylint: disable=too-few-public-methods
_primary_synchronizer: Optional[Builder[Synchronizer]] = None
_secondary_synchronizer: Optional[Builder[Synchronizer]] = None

def initializers(self, initializers: List[Builder[Initializer]]) -> "ConfigBuilder":
def initializers(self, initializers: Optional[List[Builder[Initializer]]]) -> "ConfigBuilder":
"""
Sets the initializers for the data system.
"""
Expand All @@ -66,14 +50,14 @@ def synchronizers(
self._secondary_synchronizer = secondary
return self

def build(self) -> Config:
def build(self) -> DataSystemConfig:
"""
Builds the data system configuration.
"""
if self._primary_synchronizer is None:
raise ValueError("Primary synchronizer must be set")

return Config(
return DataSystemConfig(
initializers=self._initializers,
primary_synchronizer=self._primary_synchronizer,
secondary_synchronizer=self._secondary_synchronizer,
Expand Down Expand Up @@ -144,7 +128,7 @@ def polling(config: LDConfig) -> ConfigBuilder:
streaming, but may be necessary in some network environments.
"""

polling_builder = __polling_ds_builder(config)
polling_builder: Builder[Synchronizer] = __polling_ds_builder(config)

builder = ConfigBuilder()
builder.synchronizers(polling_builder)
Expand Down
11 changes: 10 additions & 1 deletion ldclient/impl/datasystem/fdv1.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,16 @@ def flag_tracker(self) -> FlagTracker:

@property
def data_availability(self) -> DataAvailability:
return self._data_availability
if self._config.offline:
return DataAvailability.DEFAULTS

if self._update_processor is not None and self._update_processor.initialized():
return DataAvailability.REFRESHED

if self._store_wrapper.initialized:
return DataAvailability.CACHED

return DataAvailability.DEFAULTS

@property
def target_availability(self) -> DataAvailability:
Expand Down
Loading
Loading