Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions ldclient/impl/datasourcev2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,35 @@

from abc import abstractmethod
from dataclasses import dataclass
from typing import Generator, Iterable, Mapping, Optional, Protocol, Tuple
from typing import Generator, Mapping, Optional, Protocol, Tuple

from ldclient.impl.datasystem.protocolv2 import ChangeSet, Selector
from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet
from ldclient.impl.util import _Result
from ldclient.interfaces import DataSourceErrorInfo, DataSourceState

PollingResult = _Result[Tuple[ChangeSet, Mapping], str]


class PollingRequester(Protocol): # pylint: disable=too-few-public-methods
BasisResult = _Result[Basis, str]


class Initializer(Protocol): # pylint: disable=too-few-public-methods
"""
PollingRequester allows PollingDataSource to delegate fetching data to
another component.
Initializer represents a component capable of retrieving a single data
result, such as from the LD polling API.

This is useful for testing the PollingDataSource without needing to set up
a test HTTP server.
The intent of initializers is to quickly fetch an initial set of data,
which may be stale but is fast to retrieve. This initial data serves as a
foundation for a Synchronizer to build upon, enabling it to provide updates
as new changes occur.
"""

@abstractmethod
def fetch(self, selector: Optional[Selector]) -> PollingResult:
def fetch(self) -> BasisResult:
"""
Fetches the data for the given selector.
Returns a Result containing a tuple of ChangeSet and any request headers,
or an error if the data could not be retrieved.
sync should begin the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
occurs.
"""
raise NotImplementedError

Expand Down Expand Up @@ -74,4 +79,11 @@ def sync(self) -> Generator[Update, None, None]:
raise NotImplementedError


__all__: list[str] = []
__all__: list[str] = [
# Initializer-related types
"BasisResult",
"Initializer",
# Synchronizer-related types
"Update",
"Synchronizer",
]
124 changes: 104 additions & 20 deletions ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
"""

import json
from abc import abstractmethod
from collections import namedtuple
from typing import Iterable, Optional
from threading import Event
from time import time
from typing import Generator, Optional, Protocol
from urllib import parse

import urllib3

from ldclient.impl.datasourcev2 import PollingRequester, PollingResult, Update
from ldclient.impl.datasourcev2 import BasisResult, PollingResult, Update
from ldclient.impl.datasystem.protocolv2 import (
Basis,
ChangeSet,
Expand All @@ -25,7 +28,6 @@
from ldclient.impl.http import _http_factory
from ldclient.impl.repeating_task import RepeatingTask
from ldclient.impl.util import (
Result,
UnsuccessfulResponseException,
_Fail,
_headers,
Expand All @@ -35,13 +37,37 @@
is_http_error_recoverable,
log
)
from ldclient.interfaces import (
DataSourceErrorInfo,
DataSourceErrorKind,
DataSourceState
)

POLLING_ENDPOINT = "/sdk/poll"


CacheEntry = namedtuple("CacheEntry", ["data", "etag"])


class Requester(Protocol): # pylint: disable=too-few-public-methods
"""
Requester allows PollingDataSource to delegate fetching data to
another component.

This is useful for testing the PollingDataSource without needing to set up
a test HTTP server.
"""

@abstractmethod
def fetch(self, selector: Optional[Selector]) -> PollingResult:
"""
Fetches the data for the given selector.
Returns a Result containing a tuple of ChangeSet and any request headers,
or an error if the data could not be retrieved.
"""
raise NotImplementedError


class PollingDataSource:
"""
PollingDataSource is a data source that can retrieve information from
Expand All @@ -51,9 +77,11 @@ class PollingDataSource:
def __init__(
self,
poll_interval: float,
requester: PollingRequester,
requester: Requester,
):
self._requester = requester
self._poll_interval = poll_interval
self._event = Event()
self._task = RepeatingTask(
"ldclient.datasource.polling", poll_interval, 0, self._poll
)
Expand All @@ -62,21 +90,73 @@ def name(self) -> str:
"""Returns the name of the initializer."""
return "PollingDataSourceV2"

def fetch(self) -> Result: # Result[Basis]:
def fetch(self) -> BasisResult:
"""
Fetch returns a Basis, or an error if the Basis could not be retrieved.
"""
return self._poll()

# TODO(fdv2): This will need to be converted into a synchronizer at some point.
# def start(self):
# log.info(
# "Starting PollingUpdateProcessor with request interval: "
# + str(self._config.poll_interval)
# )
# self._task.start()
def sync(self) -> Generator[Update, None, None]:
"""
sync begins the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
occurs.
"""
log.info("Starting PollingDataSourceV2 synchronizer")
while True:
result = self._requester.fetch(None)
if isinstance(result, _Fail):
if isinstance(result.exception, UnsuccessfulResponseException):
error_info = DataSourceErrorInfo(
kind=DataSourceErrorKind.ERROR_RESPONSE,
status_code=result.exception.status,
time=time(),
message=http_error_message(
result.exception.status, "polling request"
),
)

status_code = result.exception.status
if is_http_error_recoverable(status_code):
# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.INTERRUPTED,
error=error_info,
)
continue

# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.OFF,
error=error_info,
)
break

def _poll(self) -> Result: # Result[Basis]:
error_info = DataSourceErrorInfo(
kind=DataSourceErrorKind.NETWORK_ERROR,
time=time(),
status_code=0,
message=result.error,
)

# TODO(fdv2): Go has a designation here to handle JSON decoding separately.
# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.INTERRUPTED,
error=error_info,
)
else:
(change_set, headers) = result.value
yield Update(
state=DataSourceState.VALID,
change_set=change_set,
environment_id=headers.get("X-LD-EnvID"),
)

if self._event.wait(self._poll_interval):
break
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: PollingDataSource Fails to Handle Exceptions and Retries Prematurely

The PollingDataSource.sync() method has two issues:

  1. It does not catch exceptions raised by requester.fetch(). Network errors (e.g., timeouts, connection failures) will propagate, terminating the synchronizer generator instead of yielding an INTERRUPTED or OFF Update. This can crash the polling process in production.
  2. On recoverable HTTP errors, sync() immediately retries without waiting for the configured poll interval. This creates a tight loop that can excessively hammer the polling endpoint and consume CPU.
Fix in Cursor Fix in Web


def _poll(self) -> BasisResult:
try:
# TODO(fdv2): Need to pass the selector through
result = self._requester.fetch(None)
Expand All @@ -90,10 +170,13 @@ def _poll(self) -> Result: # Result[Basis]:
if is_http_error_recoverable(status_code):
log.warning(http_error_message_result)

return Result.fail(http_error_message_result, result.exception)
return _Fail(
error=http_error_message_result, exception=result.exception
)

return Result.fail(
result.error or "Failed to request payload", result.exception
return _Fail(
error=result.error or "Failed to request payload",
exception=result.exception,
)

(change_set, headers) = result.value
Expand All @@ -108,18 +191,19 @@ def _poll(self) -> Result: # Result[Basis]:
environment_id=env_id,
)

return Result.success(basis)
except Exception as e:
return _Success(value=basis)
except Exception as e: # pylint: disable=broad-except
msg = f"Error: Exception encountered when updating flags. {e}"
log.exception(msg)

return Result.fail(msg, e)
return _Fail(error=msg, exception=e)


# pylint: disable=too-few-public-methods
class Urllib3PollingRequester:
"""
Urllib3PollingRequester is a PollingRequester that uses urllib3 to make HTTP requests.
Urllib3PollingRequester is a Requester that uses urllib3 to make HTTP
requests.
"""

def __init__(self, config):
Expand Down
2 changes: 2 additions & 0 deletions ldclient/impl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,6 @@ class _Fail(Generic[E]):
exception: Optional[Exception] = None


# TODO(breaking): Replace the above Result class with an improved generic
# version.
_Result = Union[_Success[T], _Fail[E]]
27 changes: 7 additions & 20 deletions ldclient/testing/impl/datasourcev2/test_polling_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def test_error_is_returned_on_failure():

result = ds.fetch()

assert result.value is None
assert not result.is_success()
assert isinstance(result, _Fail)
assert result.error == "failure message"
assert result.exception is None

Expand All @@ -53,8 +52,7 @@ def test_error_is_recoverable():

result = ds.fetch()

assert result.value is None
assert not result.is_success()
assert isinstance(result, _Fail)
assert result.error is not None
assert result.error.startswith("Received HTTP error 408")
assert isinstance(result.exception, UnsuccessfulResponseException)
Expand All @@ -68,8 +66,7 @@ def test_error_is_unrecoverable():

result = ds.fetch()

assert result.value is None
assert not result.is_success()
assert isinstance(result, _Fail)
assert result.error is not None
assert result.error.startswith("Received HTTP error 401")
assert isinstance(result.exception, UnsuccessfulResponseException)
Expand All @@ -83,25 +80,21 @@ def test_handles_transfer_none():

result = ds.fetch()

assert result.is_success()
assert isinstance(result, _Success)
assert result.value is not None

assert result.value.change_set.intent_code == IntentCode.TRANSFER_NONE
assert result.value.change_set.changes == []
assert result.value.persist is False

assert result.error is None
assert result.exception is None


def test_handles_uncaught_exception():
mock_requester = MockExceptionThrowingPollingRequester()
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)

result = ds.fetch()

assert result.value is None
assert not result.is_success()
assert isinstance(result, _Fail)
assert result.error is not None
assert (
result.error
Expand All @@ -120,16 +113,13 @@ def test_handles_transfer_full():

result = ds.fetch()

assert result.is_success()
assert isinstance(result, _Success)
assert result.value is not None

assert result.value.change_set.intent_code == IntentCode.TRANSFER_FULL
assert len(result.value.change_set.changes) == 1
assert result.value.persist is True

assert result.error is None
assert result.exception is None


def test_handles_transfer_changes():
payload_str = '{"events":[{"event": "server-intent","data": {"payloads":[{"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":462,"intentCode":"xfer-changes","reason":"stale"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":462,"object":{"key":"sample-feature","on":true,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":113,"deleted":false}}},{"event": "payload-transferred","data": {"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:462)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":462}}]}'
Expand All @@ -141,12 +131,9 @@ def test_handles_transfer_changes():

result = ds.fetch()

assert result.is_success()
assert isinstance(result, _Success)
assert result.value is not None

assert result.value.change_set.intent_code == IntentCode.TRANSFER_CHANGES
assert len(result.value.change_set.changes) == 1
assert result.value.persist is True

assert result.error is None
assert result.exception is None
Loading
Loading