diff --git a/ldclient/impl/datasourcev2/__init__.py b/ldclient/impl/datasourcev2/__init__.py index ee2d12ad..96a0318e 100644 --- a/ldclient/impl/datasourcev2/__init__.py +++ b/ldclient/impl/datasourcev2/__init__.py @@ -12,30 +12,35 @@ from abc import abstractmethod from dataclasses import dataclass -from typing import Generator, Iterable, Mapping, Optional, Protocol, Tuple +from typing import Generator, Mapping, Optional, Protocol, Tuple -from ldclient.impl.datasystem.protocolv2 import ChangeSet, Selector +from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet from ldclient.impl.util import _Result from ldclient.interfaces import DataSourceErrorInfo, DataSourceState PollingResult = _Result[Tuple[ChangeSet, Mapping], str] -class PollingRequester(Protocol): # pylint: disable=too-few-public-methods +BasisResult = _Result[Basis, str] + + +class Initializer(Protocol): # pylint: disable=too-few-public-methods """ - PollingRequester allows PollingDataSource to delegate fetching data to - another component. + Initializer represents a component capable of retrieving a single data + result, such as from the LD polling API. - This is useful for testing the PollingDataSource without needing to set up - a test HTTP server. + The intent of initializers is to quickly fetch an initial set of data, + which may be stale but is fast to retrieve. This initial data serves as a + foundation for a Synchronizer to build upon, enabling it to provide updates + as new changes occur. """ @abstractmethod - def fetch(self, selector: Optional[Selector]) -> PollingResult: + def fetch(self) -> BasisResult: """ - Fetches the data for the given selector. - Returns a Result containing a tuple of ChangeSet and any request headers, - or an error if the data could not be retrieved. + sync should begin the synchronization process for the data source, yielding + Update objects until the connection is closed or an unrecoverable error + occurs. """ raise NotImplementedError @@ -74,4 +79,11 @@ def sync(self) -> Generator[Update, None, None]: raise NotImplementedError -__all__: list[str] = [] +__all__: list[str] = [ + # Initializer-related types + "BasisResult", + "Initializer", + # Synchronizer-related types + "Update", + "Synchronizer", +] diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index 5456342b..0037a179 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -4,13 +4,16 @@ """ import json +from abc import abstractmethod from collections import namedtuple -from typing import Iterable, Optional +from threading import Event +from time import time +from typing import Generator, Optional, Protocol from urllib import parse import urllib3 -from ldclient.impl.datasourcev2 import PollingRequester, PollingResult, Update +from ldclient.impl.datasourcev2 import BasisResult, PollingResult, Update from ldclient.impl.datasystem.protocolv2 import ( Basis, ChangeSet, @@ -25,7 +28,6 @@ from ldclient.impl.http import _http_factory from ldclient.impl.repeating_task import RepeatingTask from ldclient.impl.util import ( - Result, UnsuccessfulResponseException, _Fail, _headers, @@ -35,6 +37,11 @@ is_http_error_recoverable, log ) +from ldclient.interfaces import ( + DataSourceErrorInfo, + DataSourceErrorKind, + DataSourceState +) POLLING_ENDPOINT = "/sdk/poll" @@ -42,6 +49,25 @@ CacheEntry = namedtuple("CacheEntry", ["data", "etag"]) +class Requester(Protocol): # pylint: disable=too-few-public-methods + """ + Requester allows PollingDataSource to delegate fetching data to + another component. + + This is useful for testing the PollingDataSource without needing to set up + a test HTTP server. + """ + + @abstractmethod + def fetch(self, selector: Optional[Selector]) -> PollingResult: + """ + Fetches the data for the given selector. + Returns a Result containing a tuple of ChangeSet and any request headers, + or an error if the data could not be retrieved. + """ + raise NotImplementedError + + class PollingDataSource: """ PollingDataSource is a data source that can retrieve information from @@ -51,9 +77,11 @@ class PollingDataSource: def __init__( self, poll_interval: float, - requester: PollingRequester, + requester: Requester, ): self._requester = requester + self._poll_interval = poll_interval + self._event = Event() self._task = RepeatingTask( "ldclient.datasource.polling", poll_interval, 0, self._poll ) @@ -62,21 +90,73 @@ def name(self) -> str: """Returns the name of the initializer.""" return "PollingDataSourceV2" - def fetch(self) -> Result: # Result[Basis]: + def fetch(self) -> BasisResult: """ Fetch returns a Basis, or an error if the Basis could not be retrieved. """ return self._poll() - # TODO(fdv2): This will need to be converted into a synchronizer at some point. - # def start(self): - # log.info( - # "Starting PollingUpdateProcessor with request interval: " - # + str(self._config.poll_interval) - # ) - # self._task.start() + def sync(self) -> Generator[Update, None, None]: + """ + sync begins the synchronization process for the data source, yielding + Update objects until the connection is closed or an unrecoverable error + occurs. + """ + log.info("Starting PollingDataSourceV2 synchronizer") + while True: + result = self._requester.fetch(None) + if isinstance(result, _Fail): + if isinstance(result.exception, UnsuccessfulResponseException): + error_info = DataSourceErrorInfo( + kind=DataSourceErrorKind.ERROR_RESPONSE, + status_code=result.exception.status, + time=time(), + message=http_error_message( + result.exception.status, "polling request" + ), + ) + + status_code = result.exception.status + if is_http_error_recoverable(status_code): + # TODO(fdv2): Add support for environment ID + yield Update( + state=DataSourceState.INTERRUPTED, + error=error_info, + ) + continue + + # TODO(fdv2): Add support for environment ID + yield Update( + state=DataSourceState.OFF, + error=error_info, + ) + break - def _poll(self) -> Result: # Result[Basis]: + error_info = DataSourceErrorInfo( + kind=DataSourceErrorKind.NETWORK_ERROR, + time=time(), + status_code=0, + message=result.error, + ) + + # TODO(fdv2): Go has a designation here to handle JSON decoding separately. + # TODO(fdv2): Add support for environment ID + yield Update( + state=DataSourceState.INTERRUPTED, + error=error_info, + ) + else: + (change_set, headers) = result.value + yield Update( + state=DataSourceState.VALID, + change_set=change_set, + environment_id=headers.get("X-LD-EnvID"), + ) + + if self._event.wait(self._poll_interval): + break + + def _poll(self) -> BasisResult: try: # TODO(fdv2): Need to pass the selector through result = self._requester.fetch(None) @@ -90,10 +170,13 @@ def _poll(self) -> Result: # Result[Basis]: if is_http_error_recoverable(status_code): log.warning(http_error_message_result) - return Result.fail(http_error_message_result, result.exception) + return _Fail( + error=http_error_message_result, exception=result.exception + ) - return Result.fail( - result.error or "Failed to request payload", result.exception + return _Fail( + error=result.error or "Failed to request payload", + exception=result.exception, ) (change_set, headers) = result.value @@ -108,18 +191,19 @@ def _poll(self) -> Result: # Result[Basis]: environment_id=env_id, ) - return Result.success(basis) - except Exception as e: + return _Success(value=basis) + except Exception as e: # pylint: disable=broad-except msg = f"Error: Exception encountered when updating flags. {e}" log.exception(msg) - return Result.fail(msg, e) + return _Fail(error=msg, exception=e) # pylint: disable=too-few-public-methods class Urllib3PollingRequester: """ - Urllib3PollingRequester is a PollingRequester that uses urllib3 to make HTTP requests. + Urllib3PollingRequester is a Requester that uses urllib3 to make HTTP + requests. """ def __init__(self, config): diff --git a/ldclient/impl/util.py b/ldclient/impl/util.py index b827f88f..4fbaf110 100644 --- a/ldclient/impl/util.py +++ b/ldclient/impl/util.py @@ -260,4 +260,6 @@ class _Fail(Generic[E]): exception: Optional[Exception] = None +# TODO(breaking): Replace the above Result class with an improved generic +# version. _Result = Union[_Success[T], _Fail[E]] diff --git a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py index 9274d883..be2e538f 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py @@ -39,8 +39,7 @@ def test_error_is_returned_on_failure(): result = ds.fetch() - assert result.value is None - assert not result.is_success() + assert isinstance(result, _Fail) assert result.error == "failure message" assert result.exception is None @@ -53,8 +52,7 @@ def test_error_is_recoverable(): result = ds.fetch() - assert result.value is None - assert not result.is_success() + assert isinstance(result, _Fail) assert result.error is not None assert result.error.startswith("Received HTTP error 408") assert isinstance(result.exception, UnsuccessfulResponseException) @@ -68,8 +66,7 @@ def test_error_is_unrecoverable(): result = ds.fetch() - assert result.value is None - assert not result.is_success() + assert isinstance(result, _Fail) assert result.error is not None assert result.error.startswith("Received HTTP error 401") assert isinstance(result.exception, UnsuccessfulResponseException) @@ -83,16 +80,13 @@ def test_handles_transfer_none(): result = ds.fetch() - assert result.is_success() + assert isinstance(result, _Success) assert result.value is not None assert result.value.change_set.intent_code == IntentCode.TRANSFER_NONE assert result.value.change_set.changes == [] assert result.value.persist is False - assert result.error is None - assert result.exception is None - def test_handles_uncaught_exception(): mock_requester = MockExceptionThrowingPollingRequester() @@ -100,8 +94,7 @@ def test_handles_uncaught_exception(): result = ds.fetch() - assert result.value is None - assert not result.is_success() + assert isinstance(result, _Fail) assert result.error is not None assert ( result.error @@ -120,16 +113,13 @@ def test_handles_transfer_full(): result = ds.fetch() - assert result.is_success() + assert isinstance(result, _Success) assert result.value is not None assert result.value.change_set.intent_code == IntentCode.TRANSFER_FULL assert len(result.value.change_set.changes) == 1 assert result.value.persist is True - assert result.error is None - assert result.exception is None - def test_handles_transfer_changes(): payload_str = '{"events":[{"event": "server-intent","data": {"payloads":[{"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":462,"intentCode":"xfer-changes","reason":"stale"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":462,"object":{"key":"sample-feature","on":true,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":113,"deleted":false}}},{"event": "payload-transferred","data": {"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:462)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":462}}]}' @@ -141,12 +131,9 @@ def test_handles_transfer_changes(): result = ds.fetch() - assert result.is_success() + assert isinstance(result, _Success) assert result.value is not None assert result.value.change_set.intent_code == IntentCode.TRANSFER_CHANGES assert len(result.value.change_set.changes) == 1 assert result.value.persist is True - - assert result.error is None - assert result.exception is None diff --git a/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py new file mode 100644 index 00000000..4e1150cd --- /dev/null +++ b/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py @@ -0,0 +1,385 @@ +import json +from typing import Iterator, Optional + +import pytest +from ld_eventsource.sse_client import Event + +from ldclient.impl.datasourcev2 import PollingResult +from ldclient.impl.datasourcev2.polling import PollingDataSource +from ldclient.impl.datasystem.protocolv2 import ( + ChangeSetBuilder, + ChangeType, + DeleteObject, + Error, + EventName, + Goodbye, + IntentCode, + ObjectKind, + Payload, + PutObject, + Selector, + ServerIntent +) +from ldclient.impl.util import UnsuccessfulResponseException, _Fail, _Success +from ldclient.interfaces import DataSourceErrorKind, DataSourceState + + +class ListBasedRequester: + def __init__(self, results: Iterator[PollingResult]): + self._results = results + self._index = 0 + + def fetch( + self, selector: Optional[Selector] + ) -> PollingResult: # pylint: disable=unused-argument + return next(self._results) + + +@pytest.fixture +def events() -> dict: + server_intent = ServerIntent( + payload=Payload( + id="id", + target=300, + code=IntentCode.TRANSFER_FULL, + reason="cant-catchup", + ) + ) + intent_event = Event( + event=EventName.SERVER_INTENT, + data=json.dumps(server_intent.to_dict()), + ) + + put = PutObject( + version=100, kind=ObjectKind.FLAG, key="flag-key", object={"key": "flag-key"} + ) + put_event = Event( + event=EventName.PUT_OBJECT, + data=json.dumps(put.to_dict()), + ) + delete = DeleteObject(version=101, kind=ObjectKind.FLAG, key="flag-key") + delete_event = Event( + event=EventName.DELETE_OBJECT, + data=json.dumps(delete.to_dict()), + ) + + selector = Selector(state="p:SOMETHING:300", version=300) + payload_transferred_event = Event( + event=EventName.PAYLOAD_TRANSFERRED, + data=json.dumps(selector.to_dict()), + ) + + goodbye = Goodbye(reason="test reason", silent=True, catastrophe=False) + goodbye_event = Event( + event=EventName.GOODBYE, + data=json.dumps(goodbye.to_dict()), + ) + + error = Error(payload_id="p:SOMETHING:300", reason="test reason") + error_event = Event( + event=EventName.ERROR, + data=json.dumps(error.to_dict()), + ) + + heartbeat_event = Event(event=EventName.HEARTBEAT) + + return { + EventName.SERVER_INTENT: intent_event, + EventName.PAYLOAD_TRANSFERRED: payload_transferred_event, + EventName.PUT_OBJECT: put_event, + EventName.DELETE_OBJECT: delete_event, + EventName.GOODBYE: goodbye_event, + EventName.ERROR: error_event, + EventName.HEARTBEAT: heartbeat_event, + } + + +def test_handles_no_changes(): + change_set = ChangeSetBuilder.no_changes() + headers = {} + polling_result: PollingResult = _Success(value=(change_set, headers)) + + synchronizer = PollingDataSource( + poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result])) + ) + + valid = next(synchronizer.sync()) + + assert valid.state == DataSourceState.VALID + assert valid.error is None + assert valid.revert_to_fdv1 is False + assert valid.environment_id is None + assert valid.change_set is not None + assert valid.change_set.intent_code == IntentCode.TRANSFER_NONE + assert len(valid.change_set.changes) == 0 + + +def test_handles_empty_changeset(): + builder = ChangeSetBuilder() + builder.start(intent=IntentCode.TRANSFER_FULL) + change_set = builder.finish(selector=Selector(state="p:SOMETHING:300", version=300)) + headers = {} + polling_result: PollingResult = _Success(value=(change_set, headers)) + + synchronizer = PollingDataSource( + poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result])) + ) + valid = next(synchronizer.sync()) + + assert valid.state == DataSourceState.VALID + assert valid.error is None + assert valid.revert_to_fdv1 is False + assert valid.environment_id is None + + assert valid.change_set is not None + assert len(valid.change_set.changes) == 0 + assert valid.change_set.selector is not None + assert valid.change_set.selector.version == 300 + assert valid.change_set.selector.state == "p:SOMETHING:300" + assert valid.change_set.intent_code == IntentCode.TRANSFER_FULL + + +def test_handles_put_objects(): + builder = ChangeSetBuilder() + builder.start(intent=IntentCode.TRANSFER_FULL) + builder.add_put( + version=100, kind=ObjectKind.FLAG, key="flag-key", obj={"key": "flag-key"} + ) + change_set = builder.finish(selector=Selector(state="p:SOMETHING:300", version=300)) + headers = {} + polling_result: PollingResult = _Success(value=(change_set, headers)) + + synchronizer = PollingDataSource( + poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result])) + ) + valid = next(synchronizer.sync()) + + assert valid.state == DataSourceState.VALID + assert valid.error is None + assert valid.revert_to_fdv1 is False + assert valid.environment_id is None + + assert valid.change_set is not None + assert len(valid.change_set.changes) == 1 + assert valid.change_set.changes[0].action == ChangeType.PUT + assert valid.change_set.changes[0].kind == ObjectKind.FLAG + assert valid.change_set.changes[0].key == "flag-key" + assert valid.change_set.changes[0].object == {"key": "flag-key"} + assert valid.change_set.changes[0].version == 100 + assert valid.change_set.selector is not None + assert valid.change_set.selector.version == 300 + assert valid.change_set.selector.state == "p:SOMETHING:300" + assert valid.change_set.intent_code == IntentCode.TRANSFER_FULL + + +def test_handles_delete_objects(): + builder = ChangeSetBuilder() + builder.start(intent=IntentCode.TRANSFER_FULL) + builder.add_delete(version=101, kind=ObjectKind.FLAG, key="flag-key") + change_set = builder.finish(selector=Selector(state="p:SOMETHING:300", version=300)) + headers = {} + polling_result: PollingResult = _Success(value=(change_set, headers)) + + synchronizer = PollingDataSource( + poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result])) + ) + valid = next(synchronizer.sync()) + + assert valid.state == DataSourceState.VALID + assert valid.error is None + assert valid.revert_to_fdv1 is False + assert valid.environment_id is None + + assert valid.change_set is not None + assert len(valid.change_set.changes) == 1 + assert valid.change_set.changes[0].action == ChangeType.DELETE + assert valid.change_set.changes[0].kind == ObjectKind.FLAG + assert valid.change_set.changes[0].key == "flag-key" + assert valid.change_set.changes[0].version == 101 + assert valid.change_set.selector is not None + assert valid.change_set.selector.version == 300 + assert valid.change_set.selector.state == "p:SOMETHING:300" + assert valid.change_set.intent_code == IntentCode.TRANSFER_FULL + + +# def test_swallows_goodbye(events): # pylint: disable=redefined-outer-name +# builder = list_sse_client( +# [ +# events[EventName.SERVER_INTENT], +# events[EventName.GOODBYE], +# events[EventName.PAYLOAD_TRANSFERRED], +# ] +# ) +# +# synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) +# updates = list(synchronizer.sync()) +# +# builder = ChangeSetBuilder() +# builder.start(intent=IntentCode.TRANSFER_FULL) +# change_set = builder.finish(selector=Selector(state="p:SOMETHING:300", version=300)) +# headers = {} +# polling_result: PollingResult = _Success(value=(change_set, headers)) +# +# synchronizer = PollingDataSource( +# poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result])) +# ) +# updates = list(synchronizer.sync()) +# +# assert len(updates) == 1 +# assert updates[0].state == DataSourceState.VALID +# assert updates[0].error is None +# assert updates[0].revert_to_fdv1 is False +# assert updates[0].environment_id is None +# +# assert updates[0].change_set is not None +# assert len(updates[0].change_set.changes) == 1 +# assert updates[0].change_set.changes[0].action == ChangeType.DELETE +# assert updates[0].change_set.changes[0].kind == ObjectKind.FLAG +# assert updates[0].change_set.changes[0].key == "flag-key" +# assert updates[0].change_set.changes[0].version == 101 +# assert updates[0].change_set.selector is not None +# assert updates[0].change_set.selector.version == 300 +# assert updates[0].change_set.selector.state == "p:SOMETHING:300" +# assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL +# +# assert len(updates) == 1 +# assert updates[0].state == DataSourceState.VALID +# assert updates[0].error is None +# assert updates[0].revert_to_fdv1 is False +# assert updates[0].environment_id is None +# +# assert updates[0].change_set is not None +# assert len(updates[0].change_set.changes) == 0 +# assert updates[0].change_set.selector is not None +# assert updates[0].change_set.selector.version == 300 +# assert updates[0].change_set.selector.state == "p:SOMETHING:300" +# assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL +# +# +# def test_swallows_heartbeat(events): # pylint: disable=redefined-outer-name +# builder = list_sse_client( +# [ +# events[EventName.SERVER_INTENT], +# events[EventName.HEARTBEAT], +# events[EventName.PAYLOAD_TRANSFERRED], +# ] +# ) +# +# synchronizer = StreamingSynchronizer(Config(sdk_key="key"), builder) +# updates = list(synchronizer.sync()) +# +# assert len(updates) == 1 +# assert updates[0].state == DataSourceState.VALID +# assert updates[0].error is None +# assert updates[0].revert_to_fdv1 is False +# assert updates[0].environment_id is None +# +# assert updates[0].change_set is not None +# assert len(updates[0].change_set.changes) == 0 +# assert updates[0].change_set.selector is not None +# assert updates[0].change_set.selector.version == 300 +# assert updates[0].change_set.selector.state == "p:SOMETHING:300" +# assert updates[0].change_set.intent_code == IntentCode.TRANSFER_FULL +# +# +def test_generic_error_interrupts_and_recovers(): + builder = ChangeSetBuilder() + builder.start(intent=IntentCode.TRANSFER_FULL) + builder.add_delete(version=101, kind=ObjectKind.FLAG, key="flag-key") + change_set = builder.finish(selector=Selector(state="p:SOMETHING:300", version=300)) + headers = {} + polling_result: PollingResult = _Success(value=(change_set, headers)) + + synchronizer = PollingDataSource( + poll_interval=0.01, + requester=ListBasedRequester( + results=iter([_Fail(error="error for test"), polling_result]) + ), + ) + sync = synchronizer.sync() + interrupted = next(sync) + valid = next(sync) + + assert interrupted.state == DataSourceState.INTERRUPTED + assert interrupted.error is not None + assert interrupted.error.kind == DataSourceErrorKind.NETWORK_ERROR + assert interrupted.error.status_code == 0 + assert interrupted.error.message == "error for test" + assert interrupted.revert_to_fdv1 is False + assert interrupted.environment_id is None + + assert valid.change_set is not None + assert len(valid.change_set.changes) == 1 + assert valid.change_set.intent_code == IntentCode.TRANSFER_FULL + assert valid.change_set.changes[0].action == ChangeType.DELETE + + +def test_recoverable_error_continues(): + builder = ChangeSetBuilder() + builder.start(intent=IntentCode.TRANSFER_FULL) + builder.add_delete(version=101, kind=ObjectKind.FLAG, key="flag-key") + change_set = builder.finish(selector=Selector(state="p:SOMETHING:300", version=300)) + headers = {} + polling_result: PollingResult = _Success(value=(change_set, headers)) + + _failure = _Fail(error="error for test", exception=UnsuccessfulResponseException(status=408)) + + synchronizer = PollingDataSource( + poll_interval=0.01, + requester=ListBasedRequester( + results=iter([_failure, polling_result]) + ), + ) + sync = synchronizer.sync() + interrupted = next(sync) + valid = next(sync) + + assert interrupted.state == DataSourceState.INTERRUPTED + assert interrupted.error is not None + assert interrupted.error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert interrupted.error.status_code == 408 + assert interrupted.revert_to_fdv1 is False + assert interrupted.environment_id is None + + assert valid.state == DataSourceState.VALID + assert valid.error is None + assert valid.revert_to_fdv1 is False + assert valid.environment_id is None + + assert valid.change_set is not None + assert len(valid.change_set.changes) == 1 + assert valid.change_set.intent_code == IntentCode.TRANSFER_FULL + assert valid.change_set.changes[0].action == ChangeType.DELETE + + +def test_unrecoverable_error_shuts_down(): + builder = ChangeSetBuilder() + builder.start(intent=IntentCode.TRANSFER_FULL) + builder.add_delete(version=101, kind=ObjectKind.FLAG, key="flag-key") + change_set = builder.finish(selector=Selector(state="p:SOMETHING:300", version=300)) + headers = {} + polling_result: PollingResult = _Success(value=(change_set, headers)) + + _failure = _Fail(error="error for test", exception=UnsuccessfulResponseException(status=401)) + + synchronizer = PollingDataSource( + poll_interval=0.01, + requester=ListBasedRequester( + results=iter([_failure, polling_result]) + ), + ) + sync = synchronizer.sync() + off = next(sync) + assert off.state == DataSourceState.OFF + assert off.error is not None + assert off.error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert off.error.status_code == 401 + assert off.revert_to_fdv1 is False + assert off.environment_id is None + assert off.change_set is None + + try: + next(sync) + assert False, "Expected StopIteration" + except StopIteration: + pass