Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 2 additions & 76 deletions ldclient/impl/datasourcev2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,80 +10,6 @@
You have been warned.
"""

from abc import abstractmethod
from dataclasses import dataclass
from typing import Generator, Mapping, Optional, Protocol, Tuple
from .polling import PollingResult, Requester

from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet
from ldclient.impl.util import _Result
from ldclient.interfaces import DataSourceErrorInfo, DataSourceState

PollingResult = _Result[Tuple[ChangeSet, Mapping], str]


BasisResult = _Result[Basis, str]


class Initializer(Protocol): # pylint: disable=too-few-public-methods
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These types are more generally a part of the data system, so we should move them there.

"""
Initializer represents a component capable of retrieving a single data
result, such as from the LD polling API.
The intent of initializers is to quickly fetch an initial set of data,
which may be stale but is fast to retrieve. This initial data serves as a
foundation for a Synchronizer to build upon, enabling it to provide updates
as new changes occur.
"""

@abstractmethod
def fetch(self) -> BasisResult:
"""
sync should begin the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
occurs.
"""
raise NotImplementedError


@dataclass(frozen=True)
class Update:
"""
Update represents the results of a synchronizer's ongoing sync
method.
"""

state: DataSourceState
change_set: Optional[ChangeSet] = None
error: Optional[DataSourceErrorInfo] = None
revert_to_fdv1: bool = False
environment_id: Optional[str] = None


class Synchronizer(Protocol): # pylint: disable=too-few-public-methods
"""
Synchronizer represents a component capable of synchronizing data from an external
data source, such as a streaming or polling API.
It is responsible for yielding Update objects that represent the current state
of the data source, including any changes that have occurred since the last
synchronization.
"""

@abstractmethod
def sync(self) -> Generator[Update, None, None]:
"""
sync should begin the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
occurs.
"""
raise NotImplementedError


__all__: list[str] = [
# Initializer-related types
"BasisResult",
"Initializer",
# Synchronizer-related types
"Update",
"Synchronizer",
]
__all__: list[str] = ["PollingResult", "Requester"]
39 changes: 35 additions & 4 deletions ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from collections import namedtuple
from threading import Event
from time import time
from typing import Generator, Optional, Protocol
from typing import Generator, Mapping, Optional, Protocol, Tuple
from urllib import parse

import urllib3

from ldclient.impl.datasourcev2 import BasisResult, PollingResult, Update
from ldclient.config import Config
from ldclient.impl.datasystem import BasisResult, Update
from ldclient.impl.datasystem.protocolv2 import (
Basis,
ChangeSet,
Expand Down Expand Up @@ -46,7 +47,7 @@
POLLING_ENDPOINT = "/sdk/poll"


CacheEntry = namedtuple("CacheEntry", ["data", "etag"])
PollingResult = _Result[Tuple[ChangeSet, Mapping], str]


class Requester(Protocol): # pylint: disable=too-few-public-methods
Expand All @@ -68,6 +69,9 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
raise NotImplementedError


CacheEntry = namedtuple("CacheEntry", ["data", "etag"])


class PollingDataSource:
"""
PollingDataSource is a data source that can retrieve information from
Expand Down Expand Up @@ -206,7 +210,7 @@ class Urllib3PollingRequester:
requests.
"""

def __init__(self, config):
def __init__(self, config: Config):
self._etag = None
self._http = _http_factory(config).create_pool_manager(1, config.base_uri)
self._config = config
Expand Down Expand Up @@ -335,3 +339,30 @@ def polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]:
)

return _Fail(error="didn't receive any known protocol events in polling payload")


class PollingDataSourceBuilder:
"""
Builder for a PollingDataSource.
"""

def __init__(self, config: Config):
self._config = config
self._requester: Optional[Requester] = None

def requester(self, requester: Requester) -> "PollingDataSourceBuilder":
"""Sets a custom Requester for the PollingDataSource."""
self._requester = requester
return self

def build(self) -> PollingDataSource:
"""Builds the PollingDataSource with the configured parameters."""
requester = (
self._requester
if self._requester is not None
else Urllib3PollingRequester(self._config)
)

return PollingDataSource(
poll_interval=self._config.poll_interval, requester=requester
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Polling Config Mismatch Causes SDK Startup Failure

PollingDataSourceBuilder.build incorrectly attempts to access config.poll_interval instead of config.polling_interval. This mismatch causes an AttributeError at runtime when using default() or polling() config builders, preventing the SDK from starting with polling enabled.

Fix in Cursor Fix in Web

18 changes: 16 additions & 2 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ld_eventsource.errors import HTTPStatusError

from ldclient.config import Config
from ldclient.impl.datasourcev2 import Synchronizer, Update
from ldclient.impl.datasystem import Synchronizer, Update
from ldclient.impl.datasystem.protocolv2 import (
ChangeSetBuilder,
DeleteObject,
Expand Down Expand Up @@ -110,7 +110,7 @@ def create_sse_client(config: Config) -> SSEClientImpl:
)


class StreamingSynchronizer(Synchronizer):
class StreamingDataSource(Synchronizer):
"""
StreamingSynchronizer is a specific type of Synchronizer that handles
streaming data sources.
Expand Down Expand Up @@ -386,3 +386,17 @@ def __enter__(self):
def __exit__(self, type, value, traceback):
# self.stop()
pass


class StreamingDataSourceBuilder: # disable: pylint: disable=too-few-public-methods
"""
Builder for a StreamingDataSource.
"""

def __init__(self, config: Config):
self._config = config

def build(self) -> StreamingDataSource:
"""Builds a StreamingDataSource instance with the configured parameters."""
# TODO(fdv2): Add in the other controls here.
return StreamingDataSource(self._config)
Loading
Loading