Skip to content

Commit 5398878

Browse files
authored
chore: Implement Synchronizer protocol for polling data source (#345)
1 parent b4aadca commit 5398878

File tree

5 files changed

+522
-52
lines changed

5 files changed

+522
-52
lines changed

ldclient/impl/datasourcev2/__init__.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,35 @@
1212

1313
from abc import abstractmethod
1414
from dataclasses import dataclass
15-
from typing import Generator, Iterable, Mapping, Optional, Protocol, Tuple
15+
from typing import Generator, Mapping, Optional, Protocol, Tuple
1616

17-
from ldclient.impl.datasystem.protocolv2 import ChangeSet, Selector
17+
from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet
1818
from ldclient.impl.util import _Result
1919
from ldclient.interfaces import DataSourceErrorInfo, DataSourceState
2020

2121
PollingResult = _Result[Tuple[ChangeSet, Mapping], str]
2222

2323

24-
class PollingRequester(Protocol): # pylint: disable=too-few-public-methods
24+
BasisResult = _Result[Basis, str]
25+
26+
27+
class Initializer(Protocol): # pylint: disable=too-few-public-methods
2528
"""
26-
PollingRequester allows PollingDataSource to delegate fetching data to
27-
another component.
29+
Initializer represents a component capable of retrieving a single data
30+
result, such as from the LD polling API.
2831
29-
This is useful for testing the PollingDataSource without needing to set up
30-
a test HTTP server.
32+
The intent of initializers is to quickly fetch an initial set of data,
33+
which may be stale but is fast to retrieve. This initial data serves as a
34+
foundation for a Synchronizer to build upon, enabling it to provide updates
35+
as new changes occur.
3136
"""
3237

3338
@abstractmethod
34-
def fetch(self, selector: Optional[Selector]) -> PollingResult:
39+
def fetch(self) -> BasisResult:
3540
"""
36-
Fetches the data for the given selector.
37-
Returns a Result containing a tuple of ChangeSet and any request headers,
38-
or an error if the data could not be retrieved.
41+
sync should begin the synchronization process for the data source, yielding
42+
Update objects until the connection is closed or an unrecoverable error
43+
occurs.
3944
"""
4045
raise NotImplementedError
4146

@@ -74,4 +79,11 @@ def sync(self) -> Generator[Update, None, None]:
7479
raise NotImplementedError
7580

7681

77-
__all__: list[str] = []
82+
__all__: list[str] = [
83+
# Initializer-related types
84+
"BasisResult",
85+
"Initializer",
86+
# Synchronizer-related types
87+
"Update",
88+
"Synchronizer",
89+
]

ldclient/impl/datasourcev2/polling.py

Lines changed: 104 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
"""
55

66
import json
7+
from abc import abstractmethod
78
from collections import namedtuple
8-
from typing import Iterable, Optional
9+
from threading import Event
10+
from time import time
11+
from typing import Generator, Optional, Protocol
912
from urllib import parse
1013

1114
import urllib3
1215

13-
from ldclient.impl.datasourcev2 import PollingRequester, PollingResult, Update
16+
from ldclient.impl.datasourcev2 import BasisResult, PollingResult, Update
1417
from ldclient.impl.datasystem.protocolv2 import (
1518
Basis,
1619
ChangeSet,
@@ -25,7 +28,6 @@
2528
from ldclient.impl.http import _http_factory
2629
from ldclient.impl.repeating_task import RepeatingTask
2730
from ldclient.impl.util import (
28-
Result,
2931
UnsuccessfulResponseException,
3032
_Fail,
3133
_headers,
@@ -35,13 +37,37 @@
3537
is_http_error_recoverable,
3638
log
3739
)
40+
from ldclient.interfaces import (
41+
DataSourceErrorInfo,
42+
DataSourceErrorKind,
43+
DataSourceState
44+
)
3845

3946
POLLING_ENDPOINT = "/sdk/poll"
4047

4148

4249
CacheEntry = namedtuple("CacheEntry", ["data", "etag"])
4350

4451

52+
class Requester(Protocol): # pylint: disable=too-few-public-methods
53+
"""
54+
Requester allows PollingDataSource to delegate fetching data to
55+
another component.
56+
57+
This is useful for testing the PollingDataSource without needing to set up
58+
a test HTTP server.
59+
"""
60+
61+
@abstractmethod
62+
def fetch(self, selector: Optional[Selector]) -> PollingResult:
63+
"""
64+
Fetches the data for the given selector.
65+
Returns a Result containing a tuple of ChangeSet and any request headers,
66+
or an error if the data could not be retrieved.
67+
"""
68+
raise NotImplementedError
69+
70+
4571
class PollingDataSource:
4672
"""
4773
PollingDataSource is a data source that can retrieve information from
@@ -51,9 +77,11 @@ class PollingDataSource:
5177
def __init__(
5278
self,
5379
poll_interval: float,
54-
requester: PollingRequester,
80+
requester: Requester,
5581
):
5682
self._requester = requester
83+
self._poll_interval = poll_interval
84+
self._event = Event()
5785
self._task = RepeatingTask(
5886
"ldclient.datasource.polling", poll_interval, 0, self._poll
5987
)
@@ -62,21 +90,73 @@ def name(self) -> str:
6290
"""Returns the name of the initializer."""
6391
return "PollingDataSourceV2"
6492

65-
def fetch(self) -> Result: # Result[Basis]:
93+
def fetch(self) -> BasisResult:
6694
"""
6795
Fetch returns a Basis, or an error if the Basis could not be retrieved.
6896
"""
6997
return self._poll()
7098

71-
# TODO(fdv2): This will need to be converted into a synchronizer at some point.
72-
# def start(self):
73-
# log.info(
74-
# "Starting PollingUpdateProcessor with request interval: "
75-
# + str(self._config.poll_interval)
76-
# )
77-
# self._task.start()
99+
def sync(self) -> Generator[Update, None, None]:
100+
"""
101+
sync begins the synchronization process for the data source, yielding
102+
Update objects until the connection is closed or an unrecoverable error
103+
occurs.
104+
"""
105+
log.info("Starting PollingDataSourceV2 synchronizer")
106+
while True:
107+
result = self._requester.fetch(None)
108+
if isinstance(result, _Fail):
109+
if isinstance(result.exception, UnsuccessfulResponseException):
110+
error_info = DataSourceErrorInfo(
111+
kind=DataSourceErrorKind.ERROR_RESPONSE,
112+
status_code=result.exception.status,
113+
time=time(),
114+
message=http_error_message(
115+
result.exception.status, "polling request"
116+
),
117+
)
118+
119+
status_code = result.exception.status
120+
if is_http_error_recoverable(status_code):
121+
# TODO(fdv2): Add support for environment ID
122+
yield Update(
123+
state=DataSourceState.INTERRUPTED,
124+
error=error_info,
125+
)
126+
continue
127+
128+
# TODO(fdv2): Add support for environment ID
129+
yield Update(
130+
state=DataSourceState.OFF,
131+
error=error_info,
132+
)
133+
break
78134

79-
def _poll(self) -> Result: # Result[Basis]:
135+
error_info = DataSourceErrorInfo(
136+
kind=DataSourceErrorKind.NETWORK_ERROR,
137+
time=time(),
138+
status_code=0,
139+
message=result.error,
140+
)
141+
142+
# TODO(fdv2): Go has a designation here to handle JSON decoding separately.
143+
# TODO(fdv2): Add support for environment ID
144+
yield Update(
145+
state=DataSourceState.INTERRUPTED,
146+
error=error_info,
147+
)
148+
else:
149+
(change_set, headers) = result.value
150+
yield Update(
151+
state=DataSourceState.VALID,
152+
change_set=change_set,
153+
environment_id=headers.get("X-LD-EnvID"),
154+
)
155+
156+
if self._event.wait(self._poll_interval):
157+
break
158+
159+
def _poll(self) -> BasisResult:
80160
try:
81161
# TODO(fdv2): Need to pass the selector through
82162
result = self._requester.fetch(None)
@@ -90,10 +170,13 @@ def _poll(self) -> Result: # Result[Basis]:
90170
if is_http_error_recoverable(status_code):
91171
log.warning(http_error_message_result)
92172

93-
return Result.fail(http_error_message_result, result.exception)
173+
return _Fail(
174+
error=http_error_message_result, exception=result.exception
175+
)
94176

95-
return Result.fail(
96-
result.error or "Failed to request payload", result.exception
177+
return _Fail(
178+
error=result.error or "Failed to request payload",
179+
exception=result.exception,
97180
)
98181

99182
(change_set, headers) = result.value
@@ -108,18 +191,19 @@ def _poll(self) -> Result: # Result[Basis]:
108191
environment_id=env_id,
109192
)
110193

111-
return Result.success(basis)
112-
except Exception as e:
194+
return _Success(value=basis)
195+
except Exception as e: # pylint: disable=broad-except
113196
msg = f"Error: Exception encountered when updating flags. {e}"
114197
log.exception(msg)
115198

116-
return Result.fail(msg, e)
199+
return _Fail(error=msg, exception=e)
117200

118201

119202
# pylint: disable=too-few-public-methods
120203
class Urllib3PollingRequester:
121204
"""
122-
Urllib3PollingRequester is a PollingRequester that uses urllib3 to make HTTP requests.
205+
Urllib3PollingRequester is a Requester that uses urllib3 to make HTTP
206+
requests.
123207
"""
124208

125209
def __init__(self, config):

ldclient/impl/util.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,4 +260,6 @@ class _Fail(Generic[E]):
260260
exception: Optional[Exception] = None
261261

262262

263+
# TODO(breaking): Replace the above Result class with an improved generic
264+
# version.
263265
_Result = Union[_Success[T], _Fail[E]]

ldclient/testing/impl/datasourcev2/test_polling_initializer.py

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ def test_error_is_returned_on_failure():
3939

4040
result = ds.fetch()
4141

42-
assert result.value is None
43-
assert not result.is_success()
42+
assert isinstance(result, _Fail)
4443
assert result.error == "failure message"
4544
assert result.exception is None
4645

@@ -53,8 +52,7 @@ def test_error_is_recoverable():
5352

5453
result = ds.fetch()
5554

56-
assert result.value is None
57-
assert not result.is_success()
55+
assert isinstance(result, _Fail)
5856
assert result.error is not None
5957
assert result.error.startswith("Received HTTP error 408")
6058
assert isinstance(result.exception, UnsuccessfulResponseException)
@@ -68,8 +66,7 @@ def test_error_is_unrecoverable():
6866

6967
result = ds.fetch()
7068

71-
assert result.value is None
72-
assert not result.is_success()
69+
assert isinstance(result, _Fail)
7370
assert result.error is not None
7471
assert result.error.startswith("Received HTTP error 401")
7572
assert isinstance(result.exception, UnsuccessfulResponseException)
@@ -83,25 +80,21 @@ def test_handles_transfer_none():
8380

8481
result = ds.fetch()
8582

86-
assert result.is_success()
83+
assert isinstance(result, _Success)
8784
assert result.value is not None
8885

8986
assert result.value.change_set.intent_code == IntentCode.TRANSFER_NONE
9087
assert result.value.change_set.changes == []
9188
assert result.value.persist is False
9289

93-
assert result.error is None
94-
assert result.exception is None
95-
9690

9791
def test_handles_uncaught_exception():
9892
mock_requester = MockExceptionThrowingPollingRequester()
9993
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)
10094

10195
result = ds.fetch()
10296

103-
assert result.value is None
104-
assert not result.is_success()
97+
assert isinstance(result, _Fail)
10598
assert result.error is not None
10699
assert (
107100
result.error
@@ -120,16 +113,13 @@ def test_handles_transfer_full():
120113

121114
result = ds.fetch()
122115

123-
assert result.is_success()
116+
assert isinstance(result, _Success)
124117
assert result.value is not None
125118

126119
assert result.value.change_set.intent_code == IntentCode.TRANSFER_FULL
127120
assert len(result.value.change_set.changes) == 1
128121
assert result.value.persist is True
129122

130-
assert result.error is None
131-
assert result.exception is None
132-
133123

134124
def test_handles_transfer_changes():
135125
payload_str = '{"events":[{"event": "server-intent","data": {"payloads":[{"id":"5A46PZ79FQ9D08YYKT79DECDNV","target":462,"intentCode":"xfer-changes","reason":"stale"}]}},{"event": "put-object","data": {"key":"sample-feature","kind":"flag","version":462,"object":{"key":"sample-feature","on":true,"prerequisites":[],"targets":[],"contextTargets":[],"rules":[],"fallthrough":{"variation":0},"offVariation":1,"variations":[true,false],"clientSideAvailability":{"usingMobileKey":false,"usingEnvironmentId":false},"clientSide":false,"salt":"9945e63a79a44787805b79728fee1926","trackEvents":false,"trackEventsFallthrough":false,"debugEventsUntilDate":null,"version":113,"deleted":false}}},{"event": "payload-transferred","data": {"state":"(p:5A46PZ79FQ9D08YYKT79DECDNV:462)","id":"5A46PZ79FQ9D08YYKT79DECDNV","version":462}}]}'
@@ -141,12 +131,9 @@ def test_handles_transfer_changes():
141131

142132
result = ds.fetch()
143133

144-
assert result.is_success()
134+
assert isinstance(result, _Success)
145135
assert result.value is not None
146136

147137
assert result.value.change_set.intent_code == IntentCode.TRANSFER_CHANGES
148138
assert len(result.value.change_set.changes) == 1
149139
assert result.value.persist is True
150-
151-
assert result.error is None
152-
assert result.exception is None

0 commit comments

Comments
 (0)