From 8802eb436377ac17e034fdd26bdb5517985ea75a Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 22 Jul 2025 12:40:17 -0400 Subject: [PATCH 1/5] chore: Initial implementation of FDv2 types and polling initializer --- ldclient/impl/datasourcev2/__init__.py | 6 + ldclient/impl/datasourcev2/polling.py | 272 +++++++++ ldclient/impl/datasystem/__init__.py | 49 ++ ldclient/impl/datasystem/protocolv2.py | 550 ++++++++++++++++++ ldclient/impl/events/types.py | 126 +++- ldclient/impl/util.py | 36 +- .../datasourcev2/test_polling_initializer.py | 152 +++++ .../test_polling_payload_parsing.py | 150 +++++ 8 files changed, 1317 insertions(+), 24 deletions(-) create mode 100644 ldclient/impl/datasourcev2/__init__.py create mode 100644 ldclient/impl/datasourcev2/polling.py create mode 100644 ldclient/impl/datasystem/__init__.py create mode 100644 ldclient/impl/datasystem/protocolv2.py create mode 100644 ldclient/testing/impl/datasourcev2/test_polling_initializer.py create mode 100644 ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py diff --git a/ldclient/impl/datasourcev2/__init__.py b/ldclient/impl/datasourcev2/__init__.py new file mode 100644 index 00000000..1979b2ce --- /dev/null +++ b/ldclient/impl/datasourcev2/__init__.py @@ -0,0 +1,6 @@ +""" +This module houses FDv2 types and implementations of synchronizers and +initializers for the datasystem. +""" + +__all__: list[str] = [] diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py new file mode 100644 index 00000000..a119536b --- /dev/null +++ b/ldclient/impl/datasourcev2/polling.py @@ -0,0 +1,272 @@ +""" +Default implementation of the polling synchronizer and initializer. +""" + +import json +from abc import abstractmethod +from collections import namedtuple +from collections.abc import Mapping +from typing import Optional, Protocol, Tuple +from urllib import parse + +import urllib3 + +from ldclient.impl.datasystem.protocolv2 import ( + Basis, + ChangeSet, + ChangeSetBuilder, + DeleteObject, + EventName, + IntentCode, + PutObject, + Selector, + ServerIntent, +) +from ldclient.impl.http import _http_factory +from ldclient.impl.repeating_task import RepeatingTask +from ldclient.impl.util import ( + Result, + UnsuccessfulResponseException, + _Fail, + _headers, + _Result, + _Success, + http_error_message, + is_http_error_recoverable, + log, +) + +POLLING_ENDPOINT = "/sdk/poll" + +PollingResult = _Result[Tuple[ChangeSet, Mapping], str] + + +class PollingRequester(Protocol): # pylint: disable=too-few-public-methods + """ + PollingRequester allows PollingDataSource to delegate fetching data to + another component. + + This is useful for testing the PollingDataSource without needing to set up + a test HTTP server. + """ + + @abstractmethod + def fetch(self, selector: Optional[Selector]) -> PollingResult: + """ + Fetches the data for the given selector. + Returns a Result containing a tuple of ChangeSet and any request headers, + or an error if the data could not be retrieved. + """ + raise NotImplementedError + + +CacheEntry = namedtuple("CacheEntry", ["data", "etag"]) + + +class PollingDataSource: + """ + PollingDataSource is a data source that can retrieve information from + LaunchDarkly either as an Initializer or as a Synchronizer. + """ + + def __init__( + self, + poll_interval: float, + requester: PollingRequester, + ): + self._requester = requester + self._task = RepeatingTask( + "ldclient.datasource.polling", poll_interval, 0, self._poll + ) + + def name(self) -> str: + """Returns the name of the initializer.""" + return "PollingDataSourceV2" + + def fetch(self) -> Result: # Result[Basis]: + """ + Fetch returns a Basis, or an error if the Basis could not be retrieved. + """ + return self._poll() + + # TODO(fdv2): This will need to be converted into a synchronizer at some point. + # def start(self): + # log.info( + # "Starting PollingUpdateProcessor with request interval: " + # + str(self._config.poll_interval) + # ) + # self._task.start() + + def _poll(self) -> Result: # Result[Basis]: + try: + # TODO(fdv2): Need to pass the selector through + result = self._requester.fetch(None) + + if isinstance(result, _Fail): + if isinstance(result.exception, UnsuccessfulResponseException): + status_code = result.exception.status + http_error_message_result = http_error_message( + status_code, "polling request" + ) + if is_http_error_recoverable(status_code): + log.warning(http_error_message_result) + + return Result.fail(http_error_message_result, result.exception) + + return Result.fail( + result.error or "Failed to request payload", result.exception + ) + + (change_set, headers) = result.value + + env_id = headers.get("X-LD-EnvID") + if not isinstance(env_id, str): + env_id = None + + basis = Basis( + change_set=change_set, + persist=change_set.selector is not None, + environment_id=env_id, + ) + + return Result.success(basis) + except Exception as e: + msg = f"Error: Exception encountered when updating flags. {e}" + log.exception(msg) + + return Result.fail(msg, e) + + +# pylint: disable=too-few-public-methods +class Urllib3PollingRequester: + """ + Urllib3PollingRequester is a PollingRequester that uses urllib3 to make HTTP requests. + """ + + def __init__(self, config): + self._etag = None + self._http = _http_factory(config).create_pool_manager(1, config.base_uri) + self._config = config + self._poll_uri = config.base_uri + POLLING_ENDPOINT + + def fetch(self, selector: Optional[Selector]) -> PollingResult: + """ + Fetches the data for the given selector. + Returns a Result containing a tuple of ChangeSet and any request headers, + or an error if the data could not be retrieved. + """ + query_params = {} + if self._config.payload_filter_key is not None: + query_params["filter"] = self._config.payload_filter_key + + if selector is not None: + query_params["selector"] = selector.state + + if len(query_params) > 0: + filter_query = parse.urlencode(query_params) + self._poll_uri += f"?{filter_query}" + + uri = self._poll_uri + hdrs = _headers(self._config) + hdrs["Accept-Encoding"] = "gzip" + + if self._etag is not None: + hdrs["If-None-Match"] = self._etag + + response = self._http.request( + "GET", + uri, + headers=hdrs, + timeout=urllib3.Timeout( + connect=self._config.http.connect_timeout, + read=self._config.http.read_timeout, + ), + retries=1, + ) + + if response.status >= 400: + return _Fail( + f"HTTP error {response}", UnsuccessfulResponseException(response.status) + ) + + headers = response.headers + + if response.status == 304: + return _Success(value=(ChangeSetBuilder.no_changes(), headers)) + + data = json.loads(response.data.decode("UTF-8")) + etag = headers.get("ETag") + + if etag is not None: + self._etag = etag + + log.debug( + "%s response status:[%d] ETag:[%s]", + uri, + response.status, + etag, + ) + + changeset_result = polling_payload_to_changeset(data) + if isinstance(changeset_result, _Success): + return _Success(value=(changeset_result.value, headers)) + + return _Fail( + error=changeset_result.error, + exception=changeset_result.exception, + ) + + +# pylint: disable=too-many-branches,too-many-return-statements +def polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]: + """ + Converts a polling payload into a ChangeSet. + """ + if "events" not in data or not isinstance(data["events"], list): + return _Fail(error="Invalid payload: 'events' key is missing or not a list") + + builder = ChangeSetBuilder() + + for event in data["events"]: + if not isinstance(event, dict): + return _Fail(error="Invalid payload: 'events' must be a list of objects") + + for event in data["events"]: + if event["event"] == EventName.SERVER_INTENT: + try: + server_intent = ServerIntent.from_dict(event["data"]) + except ValueError as err: + return _Fail(error="Invalid JSON in server intent", exception=err) + + if server_intent.payload.code == IntentCode.TRANSFER_NONE: + return _Success(ChangeSetBuilder.no_changes()) + + builder.start(server_intent.payload.code) + elif event["event"] == EventName.PUT_OBJECT: + try: + put = PutObject.from_dict(event["data"]) + except ValueError as err: + return _Fail(error="Invalid JSON in put object", exception=err) + + builder.add_put(put.kind, put.key, put.version, put.object) + elif event["event"] == EventName.DELETE_OBJECT: + try: + delete_object = DeleteObject.from_dict(event["data"]) + except ValueError as err: + return _Fail(error="Invalid JSON in delete object", exception=err) + + builder.add_delete( + delete_object.kind, delete_object.key, delete_object.version + ) + elif event["event"] == EventName.PAYLOAD_TRANSFERRED: + try: + selector = Selector.from_dict(event["data"]) + changeset = builder.finish(selector) + + return _Success(value=changeset) + except ValueError as err: + return _Fail( + error="Invalid JSON in payload transferred object", exception=err + ) + + return _Fail(error="didn't receive any known protocol events in polling payload") diff --git a/ldclient/impl/datasystem/__init__.py b/ldclient/impl/datasystem/__init__.py new file mode 100644 index 00000000..ad05eb50 --- /dev/null +++ b/ldclient/impl/datasystem/__init__.py @@ -0,0 +1,49 @@ +""" +This package contains the generic interfaces used for the data system (v1 and +v2), as well as types for v1 and v2 specific protocols. +""" + +from abc import abstractmethod +from typing import Protocol + +from ldclient.impl.util import Result + + +class Synchronizer(Protocol): + """ + Represents a component capable of obtaining a Basis and subsequent delta + updates asynchronously. + """ + + @abstractmethod + def name(self) -> str: + """Returns the name of the initializer.""" + raise NotImplementedError + + # TODO(fdv2): Need sync method + + def close(self): + """ + Close the synchronizer, releasing any resources it holds. + """ + + +class Initializer(Protocol): + """ + Represents a component capable of obtaining a Basis via a synchronous call. + """ + + @abstractmethod + def name(self) -> str: + """Returns the name of the initializer.""" + raise NotImplementedError + + @abstractmethod + def fetch(self) -> Result: + """ + Fetch returns a Basis, or an error if the Basis could not be retrieved. + """ + raise NotImplementedError + + +__all__: list[str] = ["Synchronizer", "Initializer"] diff --git a/ldclient/impl/datasystem/protocolv2.py b/ldclient/impl/datasystem/protocolv2.py new file mode 100644 index 00000000..8a29ac82 --- /dev/null +++ b/ldclient/impl/datasystem/protocolv2.py @@ -0,0 +1,550 @@ +""" +This module contains the protocol definitions and data types for the +LaunchDarkly data system version 2 (FDv2). +""" + +from abc import abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import Any, List, Optional, Protocol + +from ldclient.impl.util import Result + + +class EventName(str, Enum): + """ + EventName represents the name of an event that can be sent by the server for FDv2. + """ + + PUT_OBJECT = "put-object" + """ + Specifies that an object should be added to the data set with upsert semantics. + """ + + DELETE_OBJECT = "delete-object" + """ + Specifies that an object should be removed from the data set. + """ + + SERVER_INTENT = "server-intent" + """ + Specifies the server's intent. + """ + + PAYLOAD_TRANSFERRED = "payload-transferred" + """ + Specifies that that all data required to bring the existing data set to + a new version has been transferred. + """ + + HEARTBEAT = "heart-beat" + """ + Keeps the connection alive. + """ + + GOODBYE = "goodbye" + """ + Specifies that the server is about to close the connection. + """ + + ERROR = "error" + """ + Specifies that an error occurred while serving the connection. + """ + + +class IntentCode(str, Enum): + """ + IntentCode represents the various intents that can be sent by the server. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + TRANSFER_FULL = "xfer-full" + """ + The server intends to send a full data set. + """ + TRANSFER_CHANGES = "xfer-changes" + """ + The server intends to send only the necessary changes to bring an existing + data set up-to-date. + """ + + TRANSFER_NONE = "none" + """ + The server intends to send no data (payload is up to date). + """ + + +@dataclass(frozen=True) +class Payload: + """ + Payload represents a payload delivered in a streaming response. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + id: str + target: int + code: IntentCode + reason: str + + def to_dict(self) -> dict: + """ + Serializes the Payload to a JSON-compatible dictionary. + """ + return { + "id": self.id, + "target": self.target, + "intentCode": self.code.value, + "reason": self.reason, + } + + @staticmethod + def from_dict(data: dict) -> "Payload": + """ + Create a Payload from a dictionary representation. + """ + return Payload( + id=data.get("id", ""), + target=data.get("target", 0), + code=IntentCode(data.get("intentCode")), + reason=data.get("reason", ""), + ) + + +@dataclass(frozen=True) +class ServerIntent: + """ + ServerIntent represents the type of change associated with the payload + (e.g., transfer full, transfer changes, etc.) + """ + + payload: Payload + + def to_dict(self) -> dict: + """ + Serializes the ServerIntent to a JSON-compatible dictionary. + """ + return { + "payloads": [self.payload.to_dict()], + } + + @staticmethod + def from_dict(data: dict) -> "ServerIntent": + """ + Create a ServerIntent from a dictionary representation. + """ + if "payloads" not in data or not isinstance(data["payloads"], list): + raise ValueError( + "Invalid data for ServerIntent: 'payloads' key is missing or not a list" + ) + if len(data["payloads"]) != 1: + raise ValueError( + "Invalid data for ServerIntent: expected exactly one payload" + ) + + payload = data["payloads"][0] + if not isinstance(payload, dict): + raise ValueError("Invalid payload in ServerIntent: expected a dictionary") + + return ServerIntent(payload=Payload.from_dict(payload)) + + +class ObjectKind(str, Enum): + """ + ObjectKind represents the kind of object. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + FLAG = "flag" + SEGMENT = "segment" + + +@dataclass(frozen=True) +class DeleteObject: + """ + Specifies the deletion of a particular object. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + version: int + kind: ObjectKind + key: str + + def name(self) -> str: + """ + Event method. + """ + return EventName.DELETE_OBJECT + + def to_dict(self) -> dict: + """ + Serializes the DeleteObject to a JSON-compatible dictionary. + """ + return { + "version": self.version, + "kind": self.kind.value, + "key": self.key, + } + + @staticmethod + def from_dict(data: dict) -> "DeleteObject": + """ + Deserializes a DeleteObject from a JSON-compatible dictionary. + """ + version = data.get("version") + kind = ObjectKind(data.get("kind")) + key = data.get("key") + + if version is None or kind is None or key is None: + raise ValueError("Missing required fields in DeleteObject JSON.") + + return DeleteObject(version=version, kind=kind, key=key) + + +@dataclass(frozen=True) +class PutObject: + """ + Specifies the addition of a particular object with upsert semantics. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + version: int + kind: ObjectKind + key: str + object: dict + + def name(self) -> str: + """ + Event method. + """ + return EventName.PUT_OBJECT + + def to_dict(self) -> dict: + """ + Serializes the PutObject to a JSON-compatible dictionary. + """ + return { + "version": self.version, + "kind": self.kind.value, + "key": self.key, + "object": self.object, + } + + @staticmethod + def from_dict(data: dict) -> "PutObject": + """ + Deserializes a PutObject from a JSON-compatible dictionary. + """ + version = data.get("version") + kind = ObjectKind(data.get("kind")) + key = data.get("key") + object_data = data.get("object") + + if version is None or kind is None or key is None or object_data is None: + raise ValueError("Missing required fields in PutObject JSON.") + + return PutObject(version=version, kind=kind, key=key, object=object_data) + + +@dataclass(frozen=True) +class Selector: + """ + Selector represents a particular snapshot of data. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + state: str = "" + version: int = 0 + + @staticmethod + def no_selector() -> "Selector": + """ + Returns an empty Selector. + """ + return Selector() + + def is_defined(self) -> bool: + """ + Returns True if the Selector has a value. + """ + return self != Selector.no_selector() + + def name(self) -> str: + """ + Event method. + """ + return EventName.PAYLOAD_TRANSFERRED + + @staticmethod + def new_selector(state: str, version: int) -> "Selector": + """ + Creates a new Selector from a state string and version. + """ + return Selector(state=state, version=version) + + def to_dict(self) -> dict: + """ + Serializes the Selector to a JSON-compatible dictionary. + """ + return {"state": self.state, "version": self.version} + + @staticmethod + def from_dict(data: dict) -> "Selector": + """ + Deserializes a Selector from a JSON-compatible dictionary. + """ + state = data.get("state") + version = data.get("version") + + if state is None or version is None: + raise ValueError("Missing required fields in Selector JSON.") + + return Selector(state=state, version=version) + + +class ChangeType(Enum): + """ + ChangeType specifies if an object is being upserted or deleted. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + PUT = "put" + """ + Represents an object being upserted. + """ + + DELETE = "delete" + """ + Represents an object being deleted. + """ + + +@dataclass(frozen=True) +class Change: + """ + Change represents a change to a piece of data, such as an update or deletion. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + action: ChangeType + kind: ObjectKind + key: str + version: int + object: Any = ( + None # TODO(fdv2): At some point, we should define a better type for this. + ) + + +@dataclass(frozen=True) +class ChangeSet: + """ + ChangeSet represents a list of changes to be applied. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + intent_code: IntentCode + changes: List[Change] + selector: Optional[Selector] + + +@dataclass(frozen=True) +class Basis: + """ + Basis represents the initial payload of data that a data source can + provide. Initializers provide this via fetch, whereas Synchronizers provide + it asynchronously. + """ + + change_set: ChangeSet + persist: bool + environment_id: Optional[str] = None + + +class Synchronizer(Protocol): + """ + Represents a component capable of obtaining a Basis and subsequent delta + updates asynchronously. + """ + + @abstractmethod + def name(self) -> str: + """Returns the name of the initializer.""" + raise NotImplementedError + + # TODO(fdv2): Need sync method + + def close(self): + """ + Close the synchronizer, releasing any resources it holds. + """ + + +class Initializer(Protocol): + """ + Represents a component capable of obtaining a Basis via a synchronous call. + """ + + @abstractmethod + def name(self) -> str: + """Returns the name of the initializer.""" + raise NotImplementedError + + @abstractmethod + def fetch(self) -> Result: + """ + Fetch returns a Basis, or an error if the Basis could not be retrieved. + """ + raise NotImplementedError + + +class ChangeSetBuilder: + """ + ChangeSetBuilder is a helper for constructing a ChangeSet. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + def __init__(self): + self.intent: Optional[IntentCode] = None + self.changes: List[Change] = [] + + @staticmethod + def no_changes() -> "ChangeSet": + """ + Represents an intent that the current data is up-to-date and doesn't + require changes. + """ + return ChangeSet( + intent_code=IntentCode.TRANSFER_NONE, selector=None, changes=[] + ) + + @staticmethod + def empty(selector) -> "ChangeSet": + """ + Returns an empty ChangeSet, which is useful for initializing a client + without data or for clearing out all existing data. + """ + return ChangeSet( + intent_code=IntentCode.TRANSFER_FULL, selector=selector, changes=[] + ) + + def start(self, intent: IntentCode): + """ + Begins a new change set with a given intent. + """ + self.intent = intent + self.changes = [] + + def expect_changes(self): + """ + Ensures that the current ChangeSetBuilder is prepared to handle changes. + + If a data source's initial connection reflects an updated status, we + need to keep the provided server intent. This allows subsequent changes + to come down the line without an explicit server intent. + + However, to maintain logical consistency, we need to ensure that the intent + is set to IntentTransferChanges. + """ + if self.intent is None: + raise ValueError("changeset: cannot expect changes without a server-intent") + + if self.intent != IntentCode.TRANSFER_NONE: + return + + self.intent = IntentCode.TRANSFER_CHANGES + + def reset(self): + """ + Clears any existing changes while preserving the current intent. + """ + self.changes = [] + + def finish(self, selector) -> ChangeSet: + """ + Identifies a changeset with a selector and returns the completed + changeset. Clears any existing changes while preserving the current + intent, so the builder can be reused. + """ + if self.intent is None: + raise ValueError("changeset: cannot complete without a server-intent") + + changeset = ChangeSet( + intent_code=self.intent, selector=selector, changes=self.changes + ) + self.changes = [] + + # Once a full transfer has been processed, all future changes should be + # assumed to be changes. Flag delivery can override this behavior by + # sending a new server intent to any connected stream. + if self.intent == IntentCode.TRANSFER_FULL: + self.intent = IntentCode.TRANSFER_CHANGES + + return changeset + + def add_put(self, kind, key, version, obj): + """ + Adds a new object to the changeset. + """ + self.changes.append( + Change( + action=ChangeType.PUT, kind=kind, key=key, version=version, object=obj + ) + ) + + def add_delete(self, kind, key, version): + """ + Adds a deletion to the changeset. + """ + self.changes.append( + Change(action=ChangeType.DELETE, kind=kind, key=key, version=version) + ) diff --git a/ldclient/impl/events/types.py b/ldclient/impl/events/types.py index 900d2b13..d0883c47 100644 --- a/ldclient/impl/events/types.py +++ b/ldclient/impl/events/types.py @@ -15,25 +15,44 @@ class EventInput: - __slots__ = ['timestamp', 'context', 'sampling_ratio'] + __slots__ = ["timestamp", "context", "sampling_ratio"] - def __init__(self, timestamp: int, context: Context, sampling_ratio: Optional[int] = None): + def __init__( + self, timestamp: int, context: Context, sampling_ratio: Optional[int] = None + ): self.timestamp = timestamp self.context = context self.sampling_ratio = sampling_ratio def __repr__(self) -> str: # used only in test debugging - return "%s(%s)" % (self.__class__.__name__, json.dumps(self.to_debugging_dict())) + return "%s(%s)" % ( + self.__class__.__name__, + json.dumps(self.to_debugging_dict()), + ) def __eq__(self, other) -> bool: # used only in tests - return isinstance(other, EventInput) and self.to_debugging_dict() == other.to_debugging_dict() + return ( + isinstance(other, EventInput) + and self.to_debugging_dict() == other.to_debugging_dict() + ) def to_debugging_dict(self) -> dict: return {} class EventInputEvaluation(EventInput): - __slots__ = ['key', 'flag', 'variation', 'value', 'reason', 'default_value', 'prereq_of', 'track_events', 'sampling_ratio', 'exclude_from_summaries'] + __slots__ = [ + "key", + "flag", + "variation", + "value", + "reason", + "default_value", + "prereq_of", + "track_events", + "sampling_ratio", + "exclude_from_summaries", + ] def __init__( self, @@ -57,7 +76,9 @@ def __init__( self.default_value = default_value self.prereq_of = prereq_of self.track_events = track_events - self.exclude_from_summaries = False if flag is None else flag.exclude_from_summaries + self.exclude_from_summaries = ( + False if flag is None else flag.exclude_from_summaries + ) def to_debugging_dict(self) -> dict: return { @@ -86,16 +107,30 @@ def to_debugging_dict(self) -> dict: class EventInputCustom(EventInput): - __slots__ = ['key', 'data', 'metric_value'] + __slots__ = ["key", "data", "metric_value"] - def __init__(self, timestamp: int, context: Context, key: str, data: Any = None, metric_value: Optional[AnyNum] = None): + def __init__( + self, + timestamp: int, + context: Context, + key: str, + data: Any = None, + metric_value: Optional[AnyNum] = None, + ): super().__init__(timestamp, context) self.key = key self.data = data self.metric_value = metric_value # type: Optional[int|float|complex] def to_debugging_dict(self) -> dict: - return {"timestamp": self.timestamp, "context": self.context.to_dict(), "sampling_ratio": self.sampling_ratio, "key": self.key, "data": self.data, "metric_value": self.metric_value} + return { + "timestamp": self.timestamp, + "context": self.context.to_dict(), + "sampling_ratio": self.sampling_ratio, + "key": self.key, + "data": self.data, + "metric_value": self.metric_value, + } # Event constructors are centralized here to avoid mistakes and repetitive logic. @@ -107,11 +142,20 @@ def to_debugging_dict(self) -> dict: class EventFactory: - def __init__(self, with_reasons: bool, timestamp_fn: Callable[[], int] = current_time_millis): + def __init__( + self, with_reasons: bool, timestamp_fn: Callable[[], int] = current_time_millis + ): self._with_reasons = with_reasons self._timestamp_fn = timestamp_fn - def new_eval_event(self, flag: FeatureFlag, context: Context, detail: EvaluationDetail, default_value: Any, prereq_of_flag: Optional[FeatureFlag] = None) -> EventInputEvaluation: + def new_eval_event( + self, + flag: FeatureFlag, + context: Context, + detail: EvaluationDetail, + default_value: Any, + prereq_of_flag: Optional[FeatureFlag] = None, + ) -> EventInputEvaluation: add_experiment_data = self.is_experiment(flag, detail.reason) return EventInputEvaluation( self._timestamp_fn(), @@ -126,28 +170,66 @@ def new_eval_event(self, flag: FeatureFlag, context: Context, detail: Evaluation flag.track_events or add_experiment_data, ) - def new_default_event(self, flag: FeatureFlag, context: Context, default_value: Any, reason: Optional[dict]) -> EventInputEvaluation: - return EventInputEvaluation(self._timestamp_fn(), context, flag.key, flag, None, default_value, reason if self._with_reasons else None, default_value, None, flag.track_events) + def new_default_event( + self, + flag: FeatureFlag, + context: Context, + default_value: Any, + reason: Optional[dict], + ) -> EventInputEvaluation: + return EventInputEvaluation( + self._timestamp_fn(), + context, + flag.key, + flag, + None, + default_value, + reason if self._with_reasons else None, + default_value, + None, + flag.track_events, + ) - def new_unknown_flag_event(self, key: str, context: Context, default_value: Any, reason: Optional[dict]) -> EventInputEvaluation: - return EventInputEvaluation(self._timestamp_fn(), context, key, None, None, default_value, reason if self._with_reasons else None, default_value, None, False) + def new_unknown_flag_event( + self, key: str, context: Context, default_value: Any, reason: Optional[dict] + ) -> EventInputEvaluation: + return EventInputEvaluation( + self._timestamp_fn(), + context, + key, + None, + None, + default_value, + reason if self._with_reasons else None, + default_value, + None, + False, + ) def new_identify_event(self, context: Context) -> EventInputIdentify: return EventInputIdentify(self._timestamp_fn(), context) - def new_custom_event(self, event_name: str, context: Context, data: Any, metric_value: Optional[AnyNum]) -> EventInputCustom: - return EventInputCustom(self._timestamp_fn(), context, event_name, data, metric_value) + def new_custom_event( + self, + event_name: str, + context: Context, + data: Any, + metric_value: Optional[AnyNum], + ) -> EventInputCustom: + return EventInputCustom( + self._timestamp_fn(), context, event_name, data, metric_value + ) @staticmethod def is_experiment(flag: FeatureFlag, reason: Optional[dict]) -> bool: if reason is not None: - if reason.get('inExperiment'): + if reason.get("inExperiment"): return True - kind = reason['kind'] - if kind == 'RULE_MATCH': - index = reason['ruleIndex'] + kind = reason["kind"] + if kind == "RULE_MATCH": + index = reason["ruleIndex"] rules = flag.rules return index >= 0 and index < len(rules) and rules[index].track_events - elif kind == 'FALLTHROUGH': + elif kind == "FALLTHROUGH": return flag.track_events_fallthrough return False diff --git a/ldclient/impl/util.py b/ldclient/impl/util.py index 968c87d8..b827f88f 100644 --- a/ldclient/impl/util.py +++ b/ldclient/impl/util.py @@ -2,8 +2,9 @@ import re import sys import time +from dataclasses import dataclass from datetime import timedelta -from typing import Any, Optional +from typing import Any, Generic, Optional, TypeVar, Union from urllib.parse import urlparse, urlunparse from ldclient.impl.http import _base_headers @@ -161,7 +162,7 @@ class Result: Results can either be considered a success or a failure. - In the event of success, the Result will contain an option, nullable value + In the event of success, the Result will contain an optional, nullable value to hold any success value back to the calling function. If the operation fails, the Result will contain an error describing the @@ -220,12 +221,43 @@ def is_success(self) -> bool: @property def value(self) -> Optional[Any]: + """ + Retrieve the value from this result, if it exists. If this result + represents failure, this will be None. + """ return self.__value @property def error(self) -> Optional[str]: + """ + Retrieve the error from this result, if it exists. If this result + represents success, this will be None. + """ return self.__error @property def exception(self) -> Optional[Exception]: + """ + Retrieve the exception from this result, if it exists. If this result + represents success, this will be None. + """ + return self.__exception + + +T = TypeVar("T") +E = TypeVar("E") + + +@dataclass(frozen=True) +class _Success(Generic[T]): + value: T + + +@dataclass(frozen=True) +class _Fail(Generic[E]): + error: E + exception: Optional[Exception] = None + + +_Result = Union[_Success[T], _Fail[E]] diff --git a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py new file mode 100644 index 00000000..7acd6f8e --- /dev/null +++ b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py @@ -0,0 +1,152 @@ +# pylint: disable=missing-docstring + +import json +from typing import Optional + +from ldclient.impl.datasourcev2.polling import ( + PollingDataSource, + PollingResult, + Selector, + polling_payload_to_changeset, +) +from ldclient.impl.datasystem.protocolv2 import ChangeSetBuilder, IntentCode +from ldclient.impl.util import UnsuccessfulResponseException, _Fail, _Success + + +class MockExceptionThrowingPollingRequester: # pylint: disable=too-few-public-methods + def fetch(self, selector: Optional[Selector]) -> PollingResult: + raise Exception("This is a mock exception for testing purposes.") + + +class MockPollingRequester: # pylint: disable=too-few-public-methods + def __init__(self, result: PollingResult): + self._result = result + + def fetch(self, selector: Optional[Selector]) -> PollingResult: + return self._result + + +def test_polling_has_a_name(): + mock_requester = MockPollingRequester(_Fail(error="failure message")) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + assert ds.name() == "PollingDataSourceV2" + + +def test_error_is_returned_on_failure(): + mock_requester = MockPollingRequester(_Fail(error="failure message")) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch() + + assert result.value is None + assert not result.is_success() + assert result.error == "failure message" + assert result.exception is None + + +def test_error_is_recoverable(): + mock_requester = MockPollingRequester( + _Fail(error="failure message", exception=UnsuccessfulResponseException(408)) + ) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch() + + assert result.value is None + assert not result.is_success() + assert result.error is not None + assert result.error.startswith("Received HTTP error 408") + assert isinstance(result.exception, UnsuccessfulResponseException) + + +def test_error_is_unrecoverable(): + mock_requester = MockPollingRequester( + _Fail(error="failure message", exception=UnsuccessfulResponseException(401)) + ) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch() + + assert result.value is None + assert not result.is_success() + assert result.error is not None + assert result.error.startswith("Received HTTP error 401") + assert isinstance(result.exception, UnsuccessfulResponseException) + + +def test_handles_transfer_none(): + mock_requester = MockPollingRequester( + _Success(value=(ChangeSetBuilder.no_changes(), {})) + ) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch() + + assert result.is_success() + assert result.value is not None + + assert result.value.change_set.intent_code == IntentCode.TRANSFER_NONE + assert result.value.change_set.changes == [] + assert result.value.persist is False + + assert result.error is None + assert result.exception is None + + +def test_handles_uncaught_exception(): + mock_requester = MockExceptionThrowingPollingRequester() + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch() + + assert result.value is None + assert not result.is_success() + assert result.error is not None + assert ( + result.error + == "Error: Exception encountered when updating flags. This is a mock exception for testing purposes." + ) + assert isinstance(result.exception, Exception) + + +def test_handles_transfer_full(): + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":461,"object":{"key":"sample-feature","on":false,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":112,"deleted":false}}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' + change_set_result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(change_set_result, _Success) + + mock_requester = MockPollingRequester(_Success(value=(change_set_result.value, {}))) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch() + + assert result.is_success() + assert result.value is not None + + assert result.value.change_set.intent_code == IntentCode.TRANSFER_FULL + assert len(result.value.change_set.changes) == 1 + assert result.value.persist is True + + assert result.error is None + assert result.exception is None + + +def test_handles_transfer_changes(): + payload_str = '{"events":[{"event": "server-intent","data": {"payloads":[{"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":462,"intentCode":"xfer-changes","reason":"stale"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":462,"object":{"key":"sample-feature","on":true,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":113,"deleted":false}}},{"event": "payload-transferred","data": {"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:462)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":462}}]}' + change_set_result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(change_set_result, _Success) + + mock_requester = MockPollingRequester(_Success(value=(change_set_result.value, {}))) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch() + + assert result.is_success() + assert result.value is not None + + assert result.value.change_set.intent_code == IntentCode.TRANSFER_CHANGES + assert len(result.value.change_set.changes) == 1 + assert result.value.persist is True + + assert result.error is None + assert result.exception is None diff --git a/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py b/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py new file mode 100644 index 00000000..832b593f --- /dev/null +++ b/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py @@ -0,0 +1,150 @@ +import json + +from ldclient.impl.datasourcev2.polling import IntentCode, polling_payload_to_changeset +from ldclient.impl.datasystem.protocolv2 import ChangeType, ObjectKind +from ldclient.impl.util import _Fail, _Success + + +def test_payload_is_missing_events_key(): + data = {} + result = polling_payload_to_changeset(data) + assert isinstance(result, _Fail) + assert result.error == "Invalid payload: 'events' key is missing or not a list" + + +def test_payload_events_value_is_invalid(): + data = {"events": "not a list"} + result = polling_payload_to_changeset(data) + assert isinstance(result, _Fail) + assert result.error == "Invalid payload: 'events' key is missing or not a list" + + +def test_payload_event_is_invalid(): + data = {"events": ["this should be a dictionary"]} + result = polling_payload_to_changeset(data) + assert isinstance(result, _Fail) + assert result.error == "Invalid payload: 'events' must be a list of objects" + + +def test_missing_protocol_events(): + data = {"events": []} + result = polling_payload_to_changeset(data) + assert isinstance(result, _Fail) + assert result.error == "didn't receive any known protocol events in polling payload" + + +def test_transfer_none(): + payload_str = '{"events":[{"event": "server-intent","data": {"payloads":[{"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":462,"intentCode":"none","reason":"up-to-date"}]}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + + assert isinstance(result, _Success) + + change_set = result.value + assert change_set.intent_code == IntentCode.TRANSFER_NONE + assert len(change_set.changes) == 0 + assert change_set.selector is None + + +def test_transfer_full_with_empty_payload(): + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + + assert isinstance(result, _Success) + + change_set = result.value + assert change_set.intent_code == IntentCode.TRANSFER_FULL + assert len(change_set.changes) == 0 + assert change_set.selector is not None + assert change_set.selector.state == "(p:5A46PZ79FQ9D08YYKT79DECDNV:461)" + assert change_set.selector.version == 461 + + +def test_server_intent_decoding_fails(): + payload_str = '{"events":[ {"event":"server-intent","data":{}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(result, _Fail) + assert result.error == "Invalid JSON in server intent" + assert isinstance(result.exception, ValueError) + + +def test_processes_put_object(): + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":461,"object":{"key":"sample-feature","on":false,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":112,"deleted":false}}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(result, _Success) + + change_set = result.value + assert change_set.intent_code == IntentCode.TRANSFER_FULL + assert len(change_set.changes) == 1 + + assert change_set.changes[0].action == ChangeType.PUT + assert change_set.changes[0].kind == ObjectKind.FLAG + assert change_set.changes[0].key == "sample-feature" + assert change_set.changes[0].version == 461 + assert isinstance(change_set.changes[0].object, dict) + + assert change_set.selector is not None + assert change_set.selector.state == "(p:5A46PZ79FQ9D08YYKT79DECDNV:461)" + assert change_set.selector.version == 461 + + +def test_processes_delete_object(): + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "delete-object","data": {"key":"sample-feature","kind":"flag","version":461}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(result, _Success) + + change_set = result.value + assert change_set.intent_code == IntentCode.TRANSFER_FULL + assert len(change_set.changes) == 1 + + assert change_set.changes[0].action == ChangeType.DELETE + assert change_set.changes[0].kind == ObjectKind.FLAG + assert change_set.changes[0].key == "sample-feature" + assert change_set.changes[0].version == 461 + assert change_set.changes[0].object is None + + assert change_set.selector is not None + assert change_set.selector.state == "(p:5A46PZ79FQ9D08YYKT79DECDNV:461)" + assert change_set.selector.version == 461 + + +def test_handles_invalid_put_object(): + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "put-object","data": {}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(result, _Fail) + assert result.error == "Invalid JSON in put object" + + +def test_handles_invalid_delete_object(): + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "delete-object","data": {}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(result, _Fail) + assert result.error == "Invalid JSON in delete object" + + +def test_handles_invalid_payload_transferred(): + payload_str = '{"events":[ {"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event":"payload-transferred","data":{}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(result, _Fail) + assert result.error == "Invalid JSON in payload transferred object" + + +def test_fails_if_starts_with_transferred(): + payload_str = '{"events":[ {"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}},{"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":461,"object":{"key":"sample-feature","on":false,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":112,"deleted":false}}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(result, _Fail) + assert result.error == "Invalid JSON in payload transferred object" + assert result.exception is not None + assert ( + result.exception.args[0] == "changeset: cannot complete without a server-intent" + ) + + +def test_fails_if_starts_with_put(): + payload_str = '{"events":[ {"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":461,"object":{"key":"sample-feature","on":false,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":112,"deleted":false}}},{"event":"payload-transferred","data":{"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:461)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":461}},{"event":"server-intent","data":{"payloads":[ {"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":461,"intentCode":"xfer-full","reason":"payload-missing"}]}}]}' + result = polling_payload_to_changeset(json.loads(payload_str)) + assert isinstance(result, _Fail) + assert result.error == "Invalid JSON in payload transferred object" + assert result.exception is not None + assert ( + result.exception.args[0] == "changeset: cannot complete without a server-intent" + ) From 0fd0afddfb9336d9a12fe721dceb9d6e2ff99740 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 24 Jul 2025 13:48:17 -0400 Subject: [PATCH 2/5] Fix some sorting --- ldclient/impl/datasourcev2/polling.py | 4 ++-- .../testing/impl/datasourcev2/test_polling_initializer.py | 2 +- .../impl/datasourcev2/test_polling_payload_parsing.py | 5 ++++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index a119536b..0a46e4f7 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -20,7 +20,7 @@ IntentCode, PutObject, Selector, - ServerIntent, + ServerIntent ) from ldclient.impl.http import _http_factory from ldclient.impl.repeating_task import RepeatingTask @@ -33,7 +33,7 @@ _Success, http_error_message, is_http_error_recoverable, - log, + log ) POLLING_ENDPOINT = "/sdk/poll" diff --git a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py index 7acd6f8e..9274d883 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py @@ -7,7 +7,7 @@ PollingDataSource, PollingResult, Selector, - polling_payload_to_changeset, + polling_payload_to_changeset ) from ldclient.impl.datasystem.protocolv2 import ChangeSetBuilder, IntentCode from ldclient.impl.util import UnsuccessfulResponseException, _Fail, _Success diff --git a/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py b/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py index 832b593f..dae87706 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_payload_parsing.py @@ -1,6 +1,9 @@ import json -from ldclient.impl.datasourcev2.polling import IntentCode, polling_payload_to_changeset +from ldclient.impl.datasourcev2.polling import ( + IntentCode, + polling_payload_to_changeset +) from ldclient.impl.datasystem.protocolv2 import ChangeType, ObjectKind from ldclient.impl.util import _Fail, _Success From 69ce8d74764763988d7b02ba4aa9a21ed7be36ed Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 30 Jul 2025 15:27:03 -0400 Subject: [PATCH 3/5] address bugbot feedback --- ldclient/impl/datasourcev2/polling.py | 8 ++++---- ldclient/impl/datasystem/protocolv2.py | 10 ++++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index 0a46e4f7..8a99317d 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -20,7 +20,7 @@ IntentCode, PutObject, Selector, - ServerIntent + ServerIntent, ) from ldclient.impl.http import _http_factory from ldclient.impl.repeating_task import RepeatingTask @@ -33,7 +33,7 @@ _Success, http_error_message, is_http_error_recoverable, - log + log, ) POLLING_ENDPOINT = "/sdk/poll" @@ -162,11 +162,11 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult: if selector is not None: query_params["selector"] = selector.state + uri = self._poll_uri if len(query_params) > 0: filter_query = parse.urlencode(query_params) - self._poll_uri += f"?{filter_query}" + uri += f"?{filter_query}" - uri = self._poll_uri hdrs = _headers(self._config) hdrs["Accept-Encoding"] = "gzip" diff --git a/ldclient/impl/datasystem/protocolv2.py b/ldclient/impl/datasystem/protocolv2.py index 8a29ac82..95834bf6 100644 --- a/ldclient/impl/datasystem/protocolv2.py +++ b/ldclient/impl/datasystem/protocolv2.py @@ -212,13 +212,13 @@ def from_dict(data: dict) -> "DeleteObject": Deserializes a DeleteObject from a JSON-compatible dictionary. """ version = data.get("version") - kind = ObjectKind(data.get("kind")) + kind = data.get("kind") key = data.get("key") if version is None or kind is None or key is None: raise ValueError("Missing required fields in DeleteObject JSON.") - return DeleteObject(version=version, kind=kind, key=key) + return DeleteObject(version=version, kind=ObjectKind(kind), key=key) @dataclass(frozen=True) @@ -261,14 +261,16 @@ def from_dict(data: dict) -> "PutObject": Deserializes a PutObject from a JSON-compatible dictionary. """ version = data.get("version") - kind = ObjectKind(data.get("kind")) + kind = data.get("kind") key = data.get("key") object_data = data.get("object") if version is None or kind is None or key is None or object_data is None: raise ValueError("Missing required fields in PutObject JSON.") - return PutObject(version=version, kind=kind, key=key, object=object_data) + return PutObject( + version=version, kind=ObjectKind(kind), key=key, object=object_data + ) @dataclass(frozen=True) From 15800b9cba5b55763b929165f2b749643fa90742 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 30 Jul 2025 15:33:47 -0400 Subject: [PATCH 4/5] no trailing comma I guess --- ldclient/impl/datasourcev2/polling.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index 8a99317d..0f57ddd2 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -20,7 +20,7 @@ IntentCode, PutObject, Selector, - ServerIntent, + ServerIntent ) from ldclient.impl.http import _http_factory from ldclient.impl.repeating_task import RepeatingTask @@ -33,7 +33,7 @@ _Success, http_error_message, is_http_error_recoverable, - log, + log ) POLLING_ENDPOINT = "/sdk/poll" From e4c996be6cf8814d8f25cfa0fe32b3aab6bd2822 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 30 Jul 2025 15:51:15 -0400 Subject: [PATCH 5/5] more feedback --- ldclient/impl/datasourcev2/polling.py | 4 +++- ldclient/impl/datasystem/protocolv2.py | 9 ++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index 0f57ddd2..6b05e9f6 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -231,7 +231,9 @@ def polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]: if not isinstance(event, dict): return _Fail(error="Invalid payload: 'events' must be a list of objects") - for event in data["events"]: + if "event" not in event: + continue + if event["event"] == EventName.SERVER_INTENT: try: server_intent = ServerIntent.from_dict(event["data"]) diff --git a/ldclient/impl/datasystem/protocolv2.py b/ldclient/impl/datasystem/protocolv2.py index 95834bf6..477a8479 100644 --- a/ldclient/impl/datasystem/protocolv2.py +++ b/ldclient/impl/datasystem/protocolv2.py @@ -113,10 +113,17 @@ def from_dict(data: dict) -> "Payload": """ Create a Payload from a dictionary representation. """ + intent_code = data.get("intentCode") + + if intent_code is None or not isinstance(intent_code, str): + raise ValueError( + "Invalid data for Payload: 'intentCode' key is missing or not a string" + ) + return Payload( id=data.get("id", ""), target=data.get("target", 0), - code=IntentCode(data.get("intentCode")), + code=IntentCode(intent_code), reason=data.get("reason", ""), )