Skip to content

Commit e5f866f

Browse files
committed
chore: Introducing Synchronizer protocol & streaming implementation
1 parent 0fd0afd commit e5f866f

File tree

6 files changed

+1085
-58
lines changed

6 files changed

+1085
-58
lines changed

ldclient/impl/datasource/streaming.py

Lines changed: 91 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
)
1515
from ld_eventsource.errors import HTTPStatusError
1616

17+
from ldclient.impl.datasystem.protocolv2 import ChangeSet
1718
from ldclient.impl.http import HTTPFactory, _http_factory
1819
from ldclient.impl.util import (
1920
http_error_message,
@@ -36,9 +37,9 @@
3637
BACKOFF_RESET_INTERVAL = 60
3738
JITTER_RATIO = 0.5
3839

39-
STREAM_ALL_PATH = '/all'
40+
STREAM_ALL_PATH = "/all"
4041

41-
ParsedPath = namedtuple('ParsedPath', ['kind', 'key'])
42+
ParsedPath = namedtuple("ParsedPath", ["kind", "key"])
4243

4344

4445
class StreamingUpdateProcessor(Thread, UpdateProcessor):
@@ -47,7 +48,7 @@ def __init__(self, config, store, ready, diagnostic_accumulator):
4748
self.daemon = True
4849
self._uri = config.stream_base_uri + STREAM_ALL_PATH
4950
if config.payload_filter_key is not None:
50-
self._uri += '?%s' % parse.urlencode({'filter': config.payload_filter_key})
51+
self._uri += "?%s" % parse.urlencode({"filter": config.payload_filter_key})
5152
self._config = config
5253
self._data_source_update_sink = config.data_source_update_sink
5354
self._store = store
@@ -61,31 +62,42 @@ def run(self):
6162
self._running = True
6263
self._sse = self._create_sse_client()
6364
self._connection_attempt_start_time = time.time()
65+
6466
for action in self._sse.all:
6567
if isinstance(action, Event):
66-
message_ok = False
68+
change_set: Optional[ChangeSet] = None
6769
try:
68-
message_ok = self._process_message(self._sink_or_store(), action)
70+
change_set = self._process_message(self._sink_or_store(), action)
6971
except json.decoder.JSONDecodeError as e:
70-
log.info("Error while handling stream event; will restart stream: %s" % e)
72+
log.info(
73+
"Error while handling stream event; will restart stream: %s" % e
74+
)
7175
self._sse.interrupt()
7276

7377
self._handle_error(e)
7478
except Exception as e:
75-
log.info("Error while handling stream event; will restart stream: %s" % e)
79+
log.info(
80+
"Error while handling stream event; will restart stream: %s" % e
81+
)
7682
self._sse.interrupt()
7783

7884
if self._data_source_update_sink is not None:
79-
error_info = DataSourceErrorInfo(DataSourceErrorKind.UNKNOWN, 0, time.time(), str(e))
85+
error_info = DataSourceErrorInfo(
86+
DataSourceErrorKind.UNKNOWN, 0, time.time(), str(e)
87+
)
8088

81-
self._data_source_update_sink.update_status(DataSourceState.INTERRUPTED, error_info)
89+
self._data_source_update_sink.update_status(
90+
DataSourceState.INTERRUPTED, error_info
91+
)
8292

83-
if message_ok:
93+
if change_set is not None:
8494
self._record_stream_init(False)
8595
self._connection_attempt_start_time = None
8696

8797
if self._data_source_update_sink is not None:
88-
self._data_source_update_sink.update_status(DataSourceState.VALID, None)
98+
self._data_source_update_sink.update_status(
99+
DataSourceState.VALID, None
100+
)
89101

90102
if not self._ready.is_set():
91103
log.info("StreamingUpdateProcessor initialized ok.")
@@ -104,19 +116,32 @@ def _record_stream_init(self, failed: bool):
104116
if self._diagnostic_accumulator and self._connection_attempt_start_time:
105117
current_time = int(time.time() * 1000)
106118
elapsed = current_time - int(self._connection_attempt_start_time * 1000)
107-
self._diagnostic_accumulator.record_stream_init(current_time, elapsed if elapsed >= 0 else 0, failed)
119+
self._diagnostic_accumulator.record_stream_init(
120+
current_time, elapsed if elapsed >= 0 else 0, failed
121+
)
108122

109123
def _create_sse_client(self) -> SSEClient:
110124
# We don't want the stream to use the same read timeout as the rest of the SDK.
111125
http_factory = _http_factory(self._config)
112-
stream_http_factory = HTTPFactory(http_factory.base_headers, http_factory.http_config, override_read_timeout=stream_read_timeout)
126+
stream_http_factory = HTTPFactory(
127+
http_factory.base_headers,
128+
http_factory.http_config,
129+
override_read_timeout=stream_read_timeout,
130+
)
113131
return SSEClient(
114132
connect=ConnectStrategy.http(
115-
url=self._uri, headers=http_factory.base_headers, pool=stream_http_factory.create_pool_manager(1, self._uri), urllib3_request_options={"timeout": stream_http_factory.timeout}
133+
url=self._uri,
134+
headers=http_factory.base_headers,
135+
pool=stream_http_factory.create_pool_manager(1, self._uri),
136+
urllib3_request_options={"timeout": stream_http_factory.timeout},
116137
),
117138
error_strategy=ErrorStrategy.always_continue(), # we'll make error-handling decisions when we see a Fault
118139
initial_retry_delay=self._config.initial_reconnect_delay,
119-
retry_delay_strategy=RetryDelayStrategy.default(max_delay=MAX_RETRY_DELAY, backoff_multiplier=2, jitter_multiplier=JITTER_RATIO),
140+
retry_delay_strategy=RetryDelayStrategy.default(
141+
max_delay=MAX_RETRY_DELAY,
142+
backoff_multiplier=2,
143+
jitter_multiplier=JITTER_RATIO,
144+
),
120145
retry_delay_reset_threshold=BACKOFF_RESET_INTERVAL,
121146
logger=log,
122147
)
@@ -142,39 +167,54 @@ def _sink_or_store(self):
142167
return self._data_source_update_sink
143168

144169
def initialized(self):
145-
return self._running and self._ready.is_set() is True and self._store.initialized is True
170+
return (
171+
self._running
172+
and self._ready.is_set() is True
173+
and self._store.initialized is True
174+
)
146175

147176
# Returns True if we initialized the feature store
148177
def _process_message(self, store, msg: Event) -> bool:
149-
if msg.event == 'put':
178+
if msg.event == "put":
150179
all_data = json.loads(msg.data)
151-
init_data = {FEATURES: all_data['data']['flags'], SEGMENTS: all_data['data']['segments']}
152-
log.debug("Received put event with %d flags and %d segments", len(init_data[FEATURES]), len(init_data[SEGMENTS]))
180+
init_data = {
181+
FEATURES: all_data["data"]["flags"],
182+
SEGMENTS: all_data["data"]["segments"],
183+
}
184+
log.debug(
185+
"Received put event with %d flags and %d segments",
186+
len(init_data[FEATURES]),
187+
len(init_data[SEGMENTS]),
188+
)
153189
store.init(init_data)
154190
return True
155-
elif msg.event == 'patch':
191+
elif msg.event == "patch":
156192
payload = json.loads(msg.data)
157-
path = payload['path']
158-
obj = payload['data']
159-
log.debug("Received patch event for %s, New version: [%d]", path, obj.get("version"))
193+
path = payload["path"]
194+
obj = payload["data"]
195+
log.debug(
196+
"Received patch event for %s, New version: [%d]",
197+
path,
198+
obj.get("version"),
199+
)
160200
target = StreamingUpdateProcessor._parse_path(path)
161201
if target is not None:
162202
store.upsert(target.kind, obj)
163203
else:
164204
log.warning("Patch for unknown path: %s", path)
165-
elif msg.event == 'delete':
205+
elif msg.event == "delete":
166206
payload = json.loads(msg.data)
167-
path = payload['path']
207+
path = payload["path"]
168208
# noinspection PyShadowingNames
169-
version = payload['version']
209+
version = payload["version"]
170210
log.debug("Received delete event for %s, New version: [%d]", path, version)
171211
target = StreamingUpdateProcessor._parse_path(path)
172212
if target is not None:
173213
store.delete(target.kind, target.key, version)
174214
else:
175215
log.warning("Delete for unknown path: %s", path)
176216
else:
177-
log.warning('Unhandled event in stream processor: ' + msg.event)
217+
log.warning("Unhandled event in stream processor: " + msg.event)
178218
return False
179219

180220
# Returns true to continue, false to stop
@@ -183,21 +223,32 @@ def _handle_error(self, error: Exception) -> bool:
183223
return False # don't retry if we've been deliberately stopped
184224

185225
if isinstance(error, json.decoder.JSONDecodeError):
186-
error_info = DataSourceErrorInfo(DataSourceErrorKind.INVALID_DATA, 0, time.time(), str(error))
226+
error_info = DataSourceErrorInfo(
227+
DataSourceErrorKind.INVALID_DATA, 0, time.time(), str(error)
228+
)
187229

188230
log.error("Unexpected error on stream connection: %s, will retry" % error)
189231
self._record_stream_init(True)
190232
self._connection_attempt_start_time = None
191233

192234
if self._data_source_update_sink is not None:
193-
self._data_source_update_sink.update_status(DataSourceState.INTERRUPTED, error_info)
235+
self._data_source_update_sink.update_status(
236+
DataSourceState.INTERRUPTED, error_info
237+
)
194238
elif isinstance(error, HTTPStatusError):
195239
self._record_stream_init(True)
196240
self._connection_attempt_start_time = None
197241

198-
error_info = DataSourceErrorInfo(DataSourceErrorKind.ERROR_RESPONSE, error.status, time.time(), str(error))
242+
error_info = DataSourceErrorInfo(
243+
DataSourceErrorKind.ERROR_RESPONSE,
244+
error.status,
245+
time.time(),
246+
str(error),
247+
)
199248

200-
http_error_message_result = http_error_message(error.status, "stream connection")
249+
http_error_message_result = http_error_message(
250+
error.status, "stream connection"
251+
)
201252
if not is_http_error_recoverable(error.status):
202253
log.error(http_error_message_result)
203254
self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited
@@ -208,14 +259,21 @@ def _handle_error(self, error: Exception) -> bool:
208259
log.warning(http_error_message_result)
209260

210261
if self._data_source_update_sink is not None:
211-
self._data_source_update_sink.update_status(DataSourceState.INTERRUPTED, error_info)
262+
self._data_source_update_sink.update_status(
263+
DataSourceState.INTERRUPTED, error_info
264+
)
212265
else:
213266
log.warning("Unexpected error on stream connection: %s, will retry" % error)
214267
self._record_stream_init(True)
215268
self._connection_attempt_start_time = None
216269

217270
if self._data_source_update_sink is not None:
218-
self._data_source_update_sink.update_status(DataSourceState.INTERRUPTED, DataSourceErrorInfo(DataSourceErrorKind.UNKNOWN, 0, time.time(), str(error)))
271+
self._data_source_update_sink.update_status(
272+
DataSourceState.INTERRUPTED,
273+
DataSourceErrorInfo(
274+
DataSourceErrorKind.UNKNOWN, 0, time.time(), str(error)
275+
),
276+
)
219277
# no stacktrace here because, for a typical connection error, it'll just be a lengthy tour of urllib3 internals
220278
self._connection_attempt_start_time = time.time() + self._sse.next_retry_delay
221279
return True
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,79 @@
11
"""
22
This module houses FDv2 types and implementations of synchronizers and
33
initializers for the datasystem.
4+
5+
All types and implementations in this module are considered internal
6+
and are not part of the public API of the LaunchDarkly Python SDK.
7+
They are subject to change without notice and should not be used directly
8+
by users of the SDK.
9+
10+
You have been warned.
411
"""
512

13+
from abc import abstractmethod
14+
from dataclasses import dataclass
15+
from typing import Iterable, Mapping, Optional, Protocol, Tuple
16+
17+
from ldclient.impl.datasystem.protocolv2 import ChangeSet, Selector
18+
from ldclient.impl.util import _Result
19+
from ldclient.interfaces import DataSourceErrorInfo, DataSourceState
20+
21+
PollingResult = _Result[Tuple[ChangeSet, Mapping], str]
22+
23+
24+
class PollingRequester(Protocol): # pylint: disable=too-few-public-methods
25+
"""
26+
PollingRequester allows PollingDataSource to delegate fetching data to
27+
another component.
28+
29+
This is useful for testing the PollingDataSource without needing to set up
30+
a test HTTP server.
31+
"""
32+
33+
@abstractmethod
34+
def fetch(self, selector: Optional[Selector]) -> PollingResult:
35+
"""
36+
Fetches the data for the given selector.
37+
Returns a Result containing a tuple of ChangeSet and any request headers,
38+
or an error if the data could not be retrieved.
39+
"""
40+
raise NotImplementedError
41+
42+
43+
@dataclass(frozen=True)
44+
class Update:
45+
"""
46+
Update represents the results of a synchronizer's ongoing sync
47+
method.
48+
"""
49+
50+
state: DataSourceState
51+
change_set: Optional[ChangeSet] = None
52+
error: Optional[DataSourceErrorInfo] = None
53+
revert_to_fdv1: bool = False
54+
# NOTE: I hate this. Let's make this some sort of metadata object so we can
55+
# retrieve all sorts of information.
56+
environment_id: Optional[str] = None
57+
58+
59+
class Synchronizer(Protocol): # pylint: disable=too-few-public-methods
60+
"""
61+
Synchronizer represents a component capable of synchronizing data from an external
62+
data source, such as a streaming or polling API.
63+
64+
It is responsible for yielding Update objects that represent the current state
65+
of the data source, including any changes that have occurred since the last
66+
synchronization.
67+
"""
68+
69+
@abstractmethod
70+
def sync(self) -> Iterable[Update]:
71+
"""
72+
sync should begin the synchronization process for the data source, yielding
73+
Update objects until the connection is closed or an unrecoverable error
74+
occurs.
75+
"""
76+
raise NotImplementedError
77+
78+
679
__all__: list[str] = []

ldclient/impl/datasourcev2/polling.py

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
"""
2-
Default implementation of the polling synchronizer and initializer.
2+
This module contains the implementations of a polling synchronizer and
3+
initializer, along with any required supporting classes and protocols.
34
"""
45

56
import json
6-
from abc import abstractmethod
77
from collections import namedtuple
8-
from collections.abc import Mapping
9-
from typing import Optional, Protocol, Tuple
8+
from typing import Iterable, Optional
109
from urllib import parse
1110

1211
import urllib3
1312

13+
from ldclient.impl.datasourcev2 import PollingRequester, PollingResult, Update
1414
from ldclient.impl.datasystem.protocolv2 import (
1515
Basis,
1616
ChangeSet,
@@ -38,27 +38,6 @@
3838

3939
POLLING_ENDPOINT = "/sdk/poll"
4040

41-
PollingResult = _Result[Tuple[ChangeSet, Mapping], str]
42-
43-
44-
class PollingRequester(Protocol): # pylint: disable=too-few-public-methods
45-
"""
46-
PollingRequester allows PollingDataSource to delegate fetching data to
47-
another component.
48-
49-
This is useful for testing the PollingDataSource without needing to set up
50-
a test HTTP server.
51-
"""
52-
53-
@abstractmethod
54-
def fetch(self, selector: Optional[Selector]) -> PollingResult:
55-
"""
56-
Fetches the data for the given selector.
57-
Returns a Result containing a tuple of ChangeSet and any request headers,
58-
or an error if the data could not be retrieved.
59-
"""
60-
raise NotImplementedError
61-
6241

6342
CacheEntry = namedtuple("CacheEntry", ["data", "etag"])
6443

0 commit comments

Comments
 (0)