From 8cd1ce689b527c602cb7a91840782a2344d45590 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 30 Jul 2025 10:03:45 -0400 Subject: [PATCH 1/3] chore: Introducing Synchronizer protocol & streaming implementation --- ldclient/impl/datasourcev2/__init__.py | 71 +++ ldclient/impl/datasourcev2/polling.py | 29 +- ldclient/impl/datasourcev2/streaming.py | 388 +++++++++++++++ ldclient/impl/datasystem/protocolv2.py | 79 +++ .../test_streaming_synchronizer.py | 458 ++++++++++++++++++ 5 files changed, 1000 insertions(+), 25 deletions(-) create mode 100644 ldclient/impl/datasourcev2/streaming.py create mode 100644 ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py diff --git a/ldclient/impl/datasourcev2/__init__.py b/ldclient/impl/datasourcev2/__init__.py index 1979b2ce..15284142 100644 --- a/ldclient/impl/datasourcev2/__init__.py +++ b/ldclient/impl/datasourcev2/__init__.py @@ -1,6 +1,77 @@ """ This module houses FDv2 types and implementations of synchronizers and initializers for the datasystem. + +All types and implementations in this module are considered internal +and are not part of the public API of the LaunchDarkly Python SDK. +They are subject to change without notice and should not be used directly +by users of the SDK. + +You have been warned. """ +from abc import abstractmethod +from dataclasses import dataclass +from typing import Iterable, Mapping, Optional, Protocol, Tuple + +from ldclient.impl.datasystem.protocolv2 import ChangeSet, Selector +from ldclient.impl.util import _Result +from ldclient.interfaces import DataSourceErrorInfo, DataSourceState + +PollingResult = _Result[Tuple[ChangeSet, Mapping], str] + + +class PollingRequester(Protocol): # pylint: disable=too-few-public-methods + """ + PollingRequester allows PollingDataSource to delegate fetching data to + another component. + + This is useful for testing the PollingDataSource without needing to set up + a test HTTP server. + """ + + @abstractmethod + def fetch(self, selector: Optional[Selector]) -> PollingResult: + """ + Fetches the data for the given selector. + Returns a Result containing a tuple of ChangeSet and any request headers, + or an error if the data could not be retrieved. + """ + raise NotImplementedError + + +@dataclass(frozen=True) +class Update: + """ + Update represents the results of a synchronizer's ongoing sync + method. + """ + + state: DataSourceState + change_set: Optional[ChangeSet] = None + error: Optional[DataSourceErrorInfo] = None + revert_to_fdv1: bool = False + environment_id: Optional[str] = None + + +class Synchronizer(Protocol): # pylint: disable=too-few-public-methods + """ + Synchronizer represents a component capable of synchronizing data from an external + data source, such as a streaming or polling API. + + It is responsible for yielding Update objects that represent the current state + of the data source, including any changes that have occurred since the last + synchronization. + """ + + @abstractmethod + def sync(self) -> Iterable[Update]: + """ + sync should begin the synchronization process for the data source, yielding + Update objects until the connection is closed or an unrecoverable error + occurs. + """ + raise NotImplementedError + + __all__: list[str] = [] diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index 6b05e9f6..5456342b 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -1,16 +1,16 @@ """ -Default implementation of the polling synchronizer and initializer. +This module contains the implementations of a polling synchronizer and +initializer, along with any required supporting classes and protocols. """ import json -from abc import abstractmethod from collections import namedtuple -from collections.abc import Mapping -from typing import Optional, Protocol, Tuple +from typing import Iterable, Optional from urllib import parse import urllib3 +from ldclient.impl.datasourcev2 import PollingRequester, PollingResult, Update from ldclient.impl.datasystem.protocolv2 import ( Basis, ChangeSet, @@ -38,27 +38,6 @@ POLLING_ENDPOINT = "/sdk/poll" -PollingResult = _Result[Tuple[ChangeSet, Mapping], str] - - -class PollingRequester(Protocol): # pylint: disable=too-few-public-methods - """ - PollingRequester allows PollingDataSource to delegate fetching data to - another component. - - This is useful for testing the PollingDataSource without needing to set up - a test HTTP server. - """ - - @abstractmethod - def fetch(self, selector: Optional[Selector]) -> PollingResult: - """ - Fetches the data for the given selector. - Returns a Result containing a tuple of ChangeSet and any request headers, - or an error if the data could not be retrieved. - """ - raise NotImplementedError - CacheEntry = namedtuple("CacheEntry", ["data", "etag"]) diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py new file mode 100644 index 00000000..89177dd4 --- /dev/null +++ b/ldclient/impl/datasourcev2/streaming.py @@ -0,0 +1,388 @@ +""" +This module contains the implementations of a streaming synchronizer, along +with any required supporting classes and protocols. +""" + +import json +from abc import abstractmethod +from time import time +from typing import Callable, Iterable, Optional, Protocol, Tuple +from urllib import parse + +from ld_eventsource import SSEClient as SSEClientImpl +from ld_eventsource.actions import Action, Event, Fault +from ld_eventsource.config import ( + ConnectStrategy, + ErrorStrategy, + RetryDelayStrategy +) +from ld_eventsource.errors import HTTPStatusError + +from ldclient.config import Config +from ldclient.impl.datasourcev2 import Synchronizer, Update +from ldclient.impl.datasystem.protocolv2 import ( + ChangeSetBuilder, + DeleteObject, + Error, + EventName, + Goodbye, + IntentCode, + PutObject, + Selector, + ServerIntent +) +from ldclient.impl.http import HTTPFactory, _http_factory +from ldclient.impl.util import ( + http_error_message, + is_http_error_recoverable, + log +) +from ldclient.interfaces import ( + DataSourceErrorInfo, + DataSourceErrorKind, + DataSourceState +) + +# allows for up to 5 minutes to elapse without any data sent across the stream. +# The heartbeats sent as comments on the stream will keep this from triggering +STREAM_READ_TIMEOUT = 5 * 60 + +MAX_RETRY_DELAY = 30 +BACKOFF_RESET_INTERVAL = 60 +JITTER_RATIO = 0.5 + +STREAMING_ENDPOINT = "/sdk/stream" + + +class SSEClient(Protocol): # pylint: disable=too-few-public-methods + """ + SSEClient is a protocol that defines the interface for a client that can + connect to a Server-Sent Events (SSE) stream and provide an iterable of + actions received from that stream. + """ + + @property + @abstractmethod + def all(self) -> Iterable[Action]: + """ + Returns an iterable of all actions received from the SSE stream. + """ + raise NotImplementedError + + +SseClientBuilder = Callable[[Config], SSEClient] + + +# TODO(sdk-1391): Pass a selector-retrieving function through so it can +# re-connect with the last known status. +def create_sse_client(config: Config) -> SSEClientImpl: + """ " + create_sse_client creates an SSEClientImpl instance configured to connect + to the LaunchDarkly streaming endpoint. + """ + uri = config.stream_base_uri + STREAMING_ENDPOINT + + # We don't want the stream to use the same read timeout as the rest of the SDK. + http_factory = _http_factory(config) + stream_http_factory = HTTPFactory( + http_factory.base_headers, + http_factory.http_config, + override_read_timeout=STREAM_READ_TIMEOUT, + ) + + return SSEClientImpl( + connect=ConnectStrategy.http( + url=uri, + headers=http_factory.base_headers, + pool=stream_http_factory.create_pool_manager(1, uri), + urllib3_request_options={"timeout": stream_http_factory.timeout}, + ), + # we'll make error-handling decisions when we see a Fault + error_strategy=ErrorStrategy.always_continue(), + initial_retry_delay=config.initial_reconnect_delay, + retry_delay_strategy=RetryDelayStrategy.default( + max_delay=MAX_RETRY_DELAY, + backoff_multiplier=2, + jitter_multiplier=JITTER_RATIO, + ), + retry_delay_reset_threshold=BACKOFF_RESET_INTERVAL, + logger=log, + ) + + +class StreamingSynchronizer(Synchronizer): + """ + StreamingSynchronizer is a specific type of Synchronizer that handles + streaming data sources. + + It should implement the sync method to yield updates as they are received + from the streaming data source. + """ + + def __init__( + self, config: Config, sse_client_builder: SseClientBuilder = create_sse_client + ): + self._sse_client_builder = sse_client_builder + self._uri = config.stream_base_uri + STREAMING_ENDPOINT + if config.payload_filter_key is not None: + self._uri += "?%s" % parse.urlencode({"filter": config.payload_filter_key}) + self._config = config + self._sse: Optional[SSEClient] = None + + def sync(self) -> Iterable[Update]: + """ + sync should begin the synchronization process for the data source, yielding + Update objects until the connection is closed or an unrecoverable error + occurs. + """ + log.info("Starting StreamingUpdateProcessor connecting to uri: %s", self._uri) + self._sse = self._sse_client_builder(self._config) + if self._sse is None: + log.error("Failed to create SSE client for streaming updates.") + return + + change_set_builder = ChangeSetBuilder() + + for action in self._sse.all: + if isinstance(action, Fault): + # If the SSE client detects the stream has closed, then it will + # emit a fault with no-error. We can ignore this since we want + # the connection to continue. + if action.error is None: + continue + + (update, should_continue) = self._handle_error(action.error) + if update is not None: + yield update + + if not should_continue: + break + continue + + if not isinstance(action, Event): + continue + + try: + update = self._process_message(action, change_set_builder) + if update is not None: + yield update + except json.decoder.JSONDecodeError as e: + log.info( + "Error while handling stream event; will restart stream: %s", e + ) + # TODO(sdk-1409) + # self._sse.interrupt() + + (update, should_continue) = self._handle_error(e) + if update is not None: + yield update + if not should_continue: + break + except Exception as e: # pylint: disable=broad-except + log.info( + "Error while handling stream event; will restart stream: %s", e + ) + # TODO(sdk-1409) + # self._sse.interrupt() + + yield Update( + state=DataSourceState.INTERRUPTED, + error=DataSourceErrorInfo( + DataSourceErrorKind.UNKNOWN, 0, time(), str(e) + ), + revert_to_fdv1=False, + environment_id=None, # TODO(sdk-1410) + ) + + # TODO(sdk-1408) + # if update is not None: + # self._record_stream_init(False) + + # if self._data_source_update_sink is not None: + # self._data_source_update_sink.update_status( + # DataSourceState.VALID, None + # ) + + # if not self._ready.is_set(): + # log.info("StreamingUpdateProcessor initialized ok.") + # self._ready.set() + + # TODO(sdk-1409) + # self._sse.close() + + # TODO(sdk-1409) + # def stop(self): + # self.__stop_with_error_info(None) + # + # def __stop_with_error_info(self, error: Optional[DataSourceErrorInfo]): + # log.info("Stopping StreamingUpdateProcessor") + # self._running = False + # if self._sse: + # self._sse.close() + # + # if self._data_source_update_sink is None: + # return + # + # self._data_source_update_sink.update_status(DataSourceState.OFF, error) + + # pylint: disable=too-many-return-statements + def _process_message( + self, msg: Event, change_set_builder: ChangeSetBuilder + ) -> Optional[Update]: + """ + Processes a single message from the SSE stream and returns an Update + object if applicable. + + This method may raise exceptions if the message is malformed or if an + error occurs while processing the message. The caller should handle these + exceptions appropriately. + """ + if msg.event == EventName.HEARTBEAT: + return None + + if msg.event == EventName.SERVER_INTENT: + server_intent = ServerIntent.from_dict(json.loads(msg.data)) + change_set_builder.start(server_intent.payload.code) + + if server_intent.payload.code == IntentCode.TRANSFER_NONE: + change_set_builder.expect_changes() + return Update( + state=DataSourceState.VALID, + environment_id=None, # TODO(sdk-1410) + ) + return None + + if msg.event == EventName.PUT_OBJECT: + put = PutObject.from_dict(json.loads(msg.data)) + change_set_builder.add_put(put.kind, put.key, put.version, put.object) + return None + + if msg.event == EventName.DELETE_OBJECT: + delete = DeleteObject.from_dict(json.loads(msg.data)) + change_set_builder.add_delete(delete.kind, delete.key, delete.version) + return None + + if msg.event == EventName.GOODBYE: + goodbye = Goodbye.from_dict(json.loads(msg.data)) + if not goodbye.silent: + log.error( + "SSE server received error: %s (%s)", + goodbye.reason, + goodbye.catastrophe, + ) + + return None + + if msg.event == EventName.ERROR: + error = Error.from_dict(json.loads(msg.data)) + log.error("Error on %s: %s", error.payload_id, error.reason) + + # The protocol should "reset" any previous change events it has + # received, but should continue to operate under the assumption the + # last server intent was in effect. + # + # The server may choose to send a new server-intent, at which point + # we will set that as well. + change_set_builder.reset() + + return None + + if msg.event == EventName.PAYLOAD_TRANSFERRED: + selector = Selector.from_dict(json.loads(msg.data)) + change_set = change_set_builder.finish(selector) + + return Update( + state=DataSourceState.VALID, + change_set=change_set, + environment_id=None, # TODO(sdk-1410) + ) + + log.info("Unexpected event found in stream: %s", msg.event) + return None + + def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: + """ + This method handles errors that occur during the streaming process. + + It may return an update indicating the error state, and a boolean + indicating whether the synchronizer should continue retrying the connection. + + If an update is provided, it should be forward upstream, regardless of + whether or not we are going to retry this failure. + """ + # if not self._running: + # return (False, None) # don't retry if we've been deliberately stopped + + update: Optional[Update] = None + + if isinstance(error, json.decoder.JSONDecodeError): + log.error("Unexpected error on stream connection: %s, will retry", error) + + update = Update( + state=DataSourceState.INTERRUPTED, + error=DataSourceErrorInfo( + DataSourceErrorKind.INVALID_DATA, 0, time(), str(error) + ), + revert_to_fdv1=False, + environment_id=None, # TODO(sdk-1410) + ) + return (update, True) + + if isinstance(error, HTTPStatusError): + error_info = DataSourceErrorInfo( + DataSourceErrorKind.ERROR_RESPONSE, + error.status, + time(), + str(error), + ) + + http_error_message_result = http_error_message( + error.status, "stream connection" + ) + + is_recoverable = is_http_error_recoverable(error.status) + + update = Update( + state=( + DataSourceState.INTERRUPTED + if is_recoverable + else DataSourceState.OFF + ), + error=error_info, + revert_to_fdv1=False, + environment_id=None, # TODO(sdk-1410) + ) + + if not is_recoverable: + log.error(http_error_message_result) + # TODO(sdk-1409) + # self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited + # self.__stop_with_error_info(error_info) + # self.stop() + return (update, False) + + log.warning(http_error_message_result) + return (update, True) + + log.warning("Unexpected error on stream connection: %s, will retry", error) + + update = Update( + state=DataSourceState.INTERRUPTED, + error=DataSourceErrorInfo( + DataSourceErrorKind.UNKNOWN, 0, time(), str(error) + ), + revert_to_fdv1=False, + environment_id=None, # TODO(sdk-1410) + ) + # no stacktrace here because, for a typical connection error, it'll + # just be a lengthy tour of urllib3 internals + + return (update, True) + + # magic methods for "with" statement (used in testing) + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + # self.stop() + pass diff --git a/ldclient/impl/datasystem/protocolv2.py b/ldclient/impl/datasystem/protocolv2.py index 477a8479..e93cb23d 100644 --- a/ldclient/impl/datasystem/protocolv2.py +++ b/ldclient/impl/datasystem/protocolv2.py @@ -280,6 +280,85 @@ def from_dict(data: dict) -> "PutObject": ) +@dataclass(frozen=True) +class Goodbye: + """ + Goodbye represents a goodbye event. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + reason: str + silent: bool + catastrophe: bool + + def to_dict(self) -> dict: + """ + Serializes the Goodbye to a JSON-compatible dictionary. + """ + return { + "reason": self.reason, + "silent": self.silent, + "catastrophe": self.catastrophe, + } + + @staticmethod + def from_dict(data: dict) -> "Goodbye": + """ + Deserializes a Goodbye event from a JSON-compatible dictionary. + """ + reason = data.get("reason") + silent = data.get("silent") + catastrophe = data.get("catastrophe") + + if reason is None or silent is None or catastrophe is None: + raise ValueError("Missing required fields in Goodbye JSON.") + + return Goodbye(reason=reason, silent=silent, catastrophe=catastrophe) + + +@dataclass(frozen=True) +class Error: + """ + Error represents an error event. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + payload_id: str + reason: str + + def to_dict(self) -> dict: + """ + Serializes the Error to a JSON-compatible dictionary. + """ + return { + "payloadId": self.payload_id, + "reason": self.reason, + } + + @staticmethod + def from_dict(data: dict) -> "Error": + """ + Deserializes an Error from a JSON-compatible dictionary. + """ + payload_id = data.get("payloadId") + reason = data.get("reason") + + if payload_id is None or reason is None: + raise ValueError("Missing required fields in Error JSON.") + + return Error(payload_id=payload_id, reason=reason) + + @dataclass(frozen=True) class Selector: """ diff --git a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py new file mode 100644 index 00000000..e161c81d --- /dev/null +++ b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py @@ -0,0 +1,458 @@ +# pylint: disable=missing-docstring, too-few-public-methods + + +import json +from abc import abstractmethod +from typing import Iterable, List, Optional + +import pytest +from ld_eventsource.actions import Action +from ld_eventsource.http import HTTPStatusError +from ld_eventsource.sse_client import Event, Fault + +from ldclient.config import Config +from ldclient.impl.datasourcev2.streaming import ( + SSEClient, + SseClientBuilder, + StreamingSynchronizer +) +from ldclient.impl.datasystem.protocolv2 import ( + ChangeType, + DeleteObject, + Error, + EventName, + Goodbye, + IntentCode, + ObjectKind, + Payload, + PutObject, + Selector, + ServerIntent +) +from ldclient.interfaces import DataSourceErrorKind, DataSourceState + + +def list_sse_client( + events: Iterable[Action], # pylint: disable=redefined-outer-name +) -> SseClientBuilder: + def builder(_: Config) -> SSEClient: + return ListBasedSseClient(events) + + return builder + + +class ListBasedSseClient: + def __init__( + self, events: Optional[Iterable[Action]] = None + ): # pylint: disable=redefined-outer-name + self._events = [] if events is None else events + + @property + def all(self) -> Iterable[Action]: + return self._events + + +class HttpExceptionThrowingSseClient: + def __init__(self, status_codes: List[int]): # pylint: disable=redefined-outer-name + self._status_codes = status_codes + self._index = 0 + + @property + @abstractmethod + def all(self) -> Iterable[Action]: + if self._index >= len(self._status_codes): + raise IndexError("Invalid number of status codes provided") + + code = self._status_codes[self._index % len(self._status_codes)] + self._index += 1 + + raise HTTPStatusError(code) + + +def test_ignores_unknown_events(): + class UnknownTypeOfEvent(Action): + pass + + unknown_named_event = Event(event="Unknown") + builder = list_sse_client([UnknownTypeOfEvent(), unknown_named_event]) + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + + assert len(list(synchronizer.sync())) == 0 + + +def test_ignores_faults_without_errors(): + errorless_fault = Fault(error=None) + builder = list_sse_client([errorless_fault]) + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + + assert len(list(synchronizer.sync())) == 0 + + +@pytest.fixture +def events() -> dict: + server_intent = ServerIntent( + payload=Payload( + id="id", + target=300, + code=IntentCode.TRANSFER_FULL, + reason="cant-catchup", + ) + ) + intent_event = Event( + event=EventName.SERVER_INTENT, + data=json.dumps(server_intent.to_dict()), + ) + + put = PutObject( + version=100, kind=ObjectKind.FLAG, key="flag-key", object={"key": "flag-key"} + ) + put_event = Event( + event=EventName.PUT_OBJECT, + data=json.dumps(put.to_dict()), + ) + delete = DeleteObject(version=101, kind=ObjectKind.FLAG, key="flag-key") + delete_event = Event( + event=EventName.DELETE_OBJECT, + data=json.dumps(delete.to_dict()), + ) + + selector = Selector(state="p:SOMETHING:300", version=300) + payload_transferred_event = Event( + event=EventName.PAYLOAD_TRANSFERRED, + data=json.dumps(selector.to_dict()), + ) + + goodbye = Goodbye(reason="test reason", silent=True, catastrophe=False) + goodbye_event = Event( + event=EventName.GOODBYE, + data=json.dumps(goodbye.to_dict()), + ) + + error = Error(payload_id="p:SOMETHING:300", reason="test reason") + error_event = Event( + event=EventName.ERROR, + data=json.dumps(error.to_dict()), + ) + + heartbeat_event = Event(event=EventName.HEARTBEAT) + + return { + EventName.SERVER_INTENT: intent_event, + EventName.PAYLOAD_TRANSFERRED: payload_transferred_event, + EventName.PUT_OBJECT: put_event, + EventName.DELETE_OBJECT: delete_event, + EventName.GOODBYE: goodbye_event, + EventName.ERROR: error_event, + EventName.HEARTBEAT: heartbeat_event, + } + + +def test_handles_no_changes(): + server_intent = ServerIntent( + payload=Payload( + id="id", + target=300, + code=IntentCode.TRANSFER_NONE, + reason="up-to-date", + ) + ) + intent_event = Event( + event=EventName.SERVER_INTENT, + data=json.dumps(server_intent.to_dict()), + ) + builder = list_sse_client([intent_event]) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.VALID + assert updates[0].error is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + assert updates[0].change_set is None + + +def test_handles_empty_changeset(events): # pylint: disable=redefined-outer-name + builder = list_sse_client( + [ + events[EventName.SERVER_INTENT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.VALID + assert updates[0].error is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + + assert updates[0].change_set is not None + assert len(updates[0].change_set.changes) == 0 + assert updates[0].change_set.selector is not None + assert updates[0].change_set.selector.version == 300 + assert updates[0].change_set.selector.state == "p:SOMETHING:300" + assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL + + +def test_handles_put_objects(events): # pylint: disable=redefined-outer-name + builder = list_sse_client( + [ + events[EventName.SERVER_INTENT], + events[EventName.PUT_OBJECT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.VALID + assert updates[0].error is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + + assert updates[0].change_set is not None + assert len(updates[0].change_set.changes) == 1 + assert updates[0].change_set.changes[0].action == ChangeType.PUT + assert updates[0].change_set.changes[0].kind == ObjectKind.FLAG + assert updates[0].change_set.changes[0].key == "flag-key" + assert updates[0].change_set.changes[0].object == {"key": "flag-key"} + assert updates[0].change_set.changes[0].version == 100 + assert updates[0].change_set.selector is not None + assert updates[0].change_set.selector.version == 300 + assert updates[0].change_set.selector.state == "p:SOMETHING:300" + assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL + + +def test_handles_delete_objects(events): # pylint: disable=redefined-outer-name + builder = list_sse_client( + [ + events[EventName.SERVER_INTENT], + events[EventName.DELETE_OBJECT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.VALID + assert updates[0].error is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + + assert updates[0].change_set is not None + assert len(updates[0].change_set.changes) == 1 + assert updates[0].change_set.changes[0].action == ChangeType.DELETE + assert updates[0].change_set.changes[0].kind == ObjectKind.FLAG + assert updates[0].change_set.changes[0].key == "flag-key" + assert updates[0].change_set.changes[0].version == 101 + assert updates[0].change_set.selector is not None + assert updates[0].change_set.selector.version == 300 + assert updates[0].change_set.selector.state == "p:SOMETHING:300" + assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL + + +def test_swallows_goodbye(events): # pylint: disable=redefined-outer-name + builder = list_sse_client( + [ + events[EventName.SERVER_INTENT], + events[EventName.GOODBYE], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.VALID + assert updates[0].error is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + + assert updates[0].change_set is not None + assert len(updates[0].change_set.changes) == 0 + assert updates[0].change_set.selector is not None + assert updates[0].change_set.selector.version == 300 + assert updates[0].change_set.selector.state == "p:SOMETHING:300" + assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL + + +def test_swallows_heartbeat(events): # pylint: disable=redefined-outer-name + builder = list_sse_client( + [ + events[EventName.SERVER_INTENT], + events[EventName.HEARTBEAT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.VALID + assert updates[0].error is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + + assert updates[0].change_set is not None + assert len(updates[0].change_set.changes) == 0 + assert updates[0].change_set.selector is not None + assert updates[0].change_set.selector.version == 300 + assert updates[0].change_set.selector.state == "p:SOMETHING:300" + assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL + + +def test_error_resets(events): # pylint: disable=redefined-outer-name + builder = list_sse_client( + [ + events[EventName.SERVER_INTENT], + events[EventName.PUT_OBJECT], + events[EventName.ERROR], + events[EventName.DELETE_OBJECT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.VALID + assert updates[0].error is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + + assert updates[0].change_set is not None + assert len(updates[0].change_set.changes) == 1 + assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL + assert updates[0].change_set.changes[0].action == ChangeType.DELETE + + +def test_handles_out_of_order(events): # pylint: disable=redefined-outer-name + builder = list_sse_client( + [ + events[EventName.PUT_OBJECT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.INTERRUPTED + assert updates[0].change_set is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + + assert updates[0].error is not None + assert updates[0].error.kind == DataSourceErrorKind.UNKNOWN + assert updates[0].error.status_code == 0 + + +def test_invalid_json_decoding(events): # pylint: disable=redefined-outer-name + intent_event = Event( + event=EventName.SERVER_INTENT, + data="{invalid_json", + ) + builder = list_sse_client( + [ + # This will generate an error but the stream should continue + intent_event, + # We send these valid combinations to ensure we get the stream back + # on track. + events[EventName.SERVER_INTENT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 2 + assert updates[0].state == DataSourceState.INTERRUPTED + assert updates[0].change_set is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + + assert updates[0].error is not None + assert updates[0].error.kind == DataSourceErrorKind.INVALID_DATA + assert updates[0].error.status_code == 0 + + assert updates[1].state == DataSourceState.VALID + assert updates[1].change_set is not None + assert len(updates[1].change_set.changes) == 0 + + +def test_stops_on_unrecoverable_status_code( + events, +): # pylint: disable=redefined-outer-name + builder = list_sse_client( + [ + # This will generate an error but the stream should continue + Fault(error=HTTPStatusError(401)), + # We send these valid combinations to ensure the stream is NOT + # being processed after the 401. + events[EventName.SERVER_INTENT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.OFF + assert updates[0].change_set is None + assert updates[0].revert_to_fdv1 is False + assert updates[0].environment_id is None + + assert updates[0].error is not None + assert updates[0].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert updates[0].error.status_code == 401 + + +def test_continues_on_recoverable_status_code( + events, +): # pylint: disable=redefined-outer-name + builder = list_sse_client( + [ + # This will generate an error but the stream should continue + Fault(error=HTTPStatusError(400)), + events[EventName.SERVER_INTENT], + Fault(error=HTTPStatusError(408)), + # We send these valid combinations to ensure the stream will + # continue to be processed. + events[EventName.SERVER_INTENT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) + updates = list(synchronizer.sync()) + + assert len(updates) == 3 + assert updates[0].state == DataSourceState.INTERRUPTED + assert updates[0].error is not None + assert updates[0].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert updates[0].error.status_code == 400 + + assert updates[1].state == DataSourceState.INTERRUPTED + assert updates[1].error is not None + assert updates[1].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert updates[1].error.status_code == 408 + + assert updates[2].state == DataSourceState.VALID + assert updates[2].change_set is not None + assert len(updates[2].change_set.changes) == 0 + assert updates[2].change_set.selector.version == 300 + assert updates[2].change_set.selector.state == "p:SOMETHING:300" + assert updates[2].change_set.intent_code == IntentCode.TRANSFER_FULL From fa152f5e1004ddfa848896c6df19c09db0a26f1d Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 31 Jul 2025 10:59:43 -0400 Subject: [PATCH 2/3] Better represent the return of streaming as a generator --- ldclient/impl/datasourcev2/__init__.py | 4 ++-- ldclient/impl/datasourcev2/streaming.py | 20 ++++++-------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/ldclient/impl/datasourcev2/__init__.py b/ldclient/impl/datasourcev2/__init__.py index 15284142..ee2d12ad 100644 --- a/ldclient/impl/datasourcev2/__init__.py +++ b/ldclient/impl/datasourcev2/__init__.py @@ -12,7 +12,7 @@ from abc import abstractmethod from dataclasses import dataclass -from typing import Iterable, Mapping, Optional, Protocol, Tuple +from typing import Generator, Iterable, Mapping, Optional, Protocol, Tuple from ldclient.impl.datasystem.protocolv2 import ChangeSet, Selector from ldclient.impl.util import _Result @@ -65,7 +65,7 @@ class Synchronizer(Protocol): # pylint: disable=too-few-public-methods """ @abstractmethod - def sync(self) -> Iterable[Update]: + def sync(self) -> Generator[Update, None, None]: """ sync should begin the synchronization process for the data source, yielding Update objects until the connection is closed or an unrecoverable error diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py index 89177dd4..0c453996 100644 --- a/ldclient/impl/datasourcev2/streaming.py +++ b/ldclient/impl/datasourcev2/streaming.py @@ -6,16 +6,12 @@ import json from abc import abstractmethod from time import time -from typing import Callable, Iterable, Optional, Protocol, Tuple +from typing import Callable, Generator, Iterable, Optional, Protocol, Tuple from urllib import parse from ld_eventsource import SSEClient as SSEClientImpl from ld_eventsource.actions import Action, Event, Fault -from ld_eventsource.config import ( - ConnectStrategy, - ErrorStrategy, - RetryDelayStrategy -) +from ld_eventsource.config import ConnectStrategy, ErrorStrategy, RetryDelayStrategy from ld_eventsource.errors import HTTPStatusError from ldclient.config import Config @@ -29,18 +25,14 @@ IntentCode, PutObject, Selector, - ServerIntent + ServerIntent, ) from ldclient.impl.http import HTTPFactory, _http_factory -from ldclient.impl.util import ( - http_error_message, - is_http_error_recoverable, - log -) +from ldclient.impl.util import http_error_message, is_http_error_recoverable, log from ldclient.interfaces import ( DataSourceErrorInfo, DataSourceErrorKind, - DataSourceState + DataSourceState, ) # allows for up to 5 minutes to elapse without any data sent across the stream. @@ -129,7 +121,7 @@ def __init__( self._config = config self._sse: Optional[SSEClient] = None - def sync(self) -> Iterable[Update]: + def sync(self) -> Generator[Update, None, None]: """ sync should begin the synchronization process for the data source, yielding Update objects until the connection is closed or an unrecoverable error From 5f373463e3a7002f2b822b2a9067a3ce27a99e9d Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 31 Jul 2025 11:27:33 -0400 Subject: [PATCH 3/3] sorting --- ldclient/impl/datasourcev2/streaming.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py index 0c453996..f4f5638b 100644 --- a/ldclient/impl/datasourcev2/streaming.py +++ b/ldclient/impl/datasourcev2/streaming.py @@ -11,7 +11,11 @@ from ld_eventsource import SSEClient as SSEClientImpl from ld_eventsource.actions import Action, Event, Fault -from ld_eventsource.config import ConnectStrategy, ErrorStrategy, RetryDelayStrategy +from ld_eventsource.config import ( + ConnectStrategy, + ErrorStrategy, + RetryDelayStrategy +) from ld_eventsource.errors import HTTPStatusError from ldclient.config import Config @@ -25,14 +29,18 @@ IntentCode, PutObject, Selector, - ServerIntent, + ServerIntent ) from ldclient.impl.http import HTTPFactory, _http_factory -from ldclient.impl.util import http_error_message, is_http_error_recoverable, log +from ldclient.impl.util import ( + http_error_message, + is_http_error_recoverable, + log +) from ldclient.interfaces import ( DataSourceErrorInfo, DataSourceErrorKind, - DataSourceState, + DataSourceState ) # allows for up to 5 minutes to elapse without any data sent across the stream.