Skip to content

Commit efe0351

Browse files
committed
chore: Create datasystem and related protocols
1 parent 5398878 commit efe0351

File tree

11 files changed

+785
-142
lines changed

11 files changed

+785
-142
lines changed

ldclient/impl/datasourcev2/__init__.py

Lines changed: 2 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -10,80 +10,6 @@
1010
You have been warned.
1111
"""
1212

13-
from abc import abstractmethod
14-
from dataclasses import dataclass
15-
from typing import Generator, Mapping, Optional, Protocol, Tuple
13+
from .polling import PollingResult, Requester
1614

17-
from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet
18-
from ldclient.impl.util import _Result
19-
from ldclient.interfaces import DataSourceErrorInfo, DataSourceState
20-
21-
PollingResult = _Result[Tuple[ChangeSet, Mapping], str]
22-
23-
24-
BasisResult = _Result[Basis, str]
25-
26-
27-
class Initializer(Protocol): # pylint: disable=too-few-public-methods
28-
"""
29-
Initializer represents a component capable of retrieving a single data
30-
result, such as from the LD polling API.
31-
32-
The intent of initializers is to quickly fetch an initial set of data,
33-
which may be stale but is fast to retrieve. This initial data serves as a
34-
foundation for a Synchronizer to build upon, enabling it to provide updates
35-
as new changes occur.
36-
"""
37-
38-
@abstractmethod
39-
def fetch(self) -> BasisResult:
40-
"""
41-
sync should begin the synchronization process for the data source, yielding
42-
Update objects until the connection is closed or an unrecoverable error
43-
occurs.
44-
"""
45-
raise NotImplementedError
46-
47-
48-
@dataclass(frozen=True)
49-
class Update:
50-
"""
51-
Update represents the results of a synchronizer's ongoing sync
52-
method.
53-
"""
54-
55-
state: DataSourceState
56-
change_set: Optional[ChangeSet] = None
57-
error: Optional[DataSourceErrorInfo] = None
58-
revert_to_fdv1: bool = False
59-
environment_id: Optional[str] = None
60-
61-
62-
class Synchronizer(Protocol): # pylint: disable=too-few-public-methods
63-
"""
64-
Synchronizer represents a component capable of synchronizing data from an external
65-
data source, such as a streaming or polling API.
66-
67-
It is responsible for yielding Update objects that represent the current state
68-
of the data source, including any changes that have occurred since the last
69-
synchronization.
70-
"""
71-
72-
@abstractmethod
73-
def sync(self) -> Generator[Update, None, None]:
74-
"""
75-
sync should begin the synchronization process for the data source, yielding
76-
Update objects until the connection is closed or an unrecoverable error
77-
occurs.
78-
"""
79-
raise NotImplementedError
80-
81-
82-
__all__: list[str] = [
83-
# Initializer-related types
84-
"BasisResult",
85-
"Initializer",
86-
# Synchronizer-related types
87-
"Update",
88-
"Synchronizer",
89-
]
15+
__all__: list[str] = ["PollingResult", "Requester"]

ldclient/impl/datasourcev2/polling.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88
from collections import namedtuple
99
from threading import Event
1010
from time import time
11-
from typing import Generator, Optional, Protocol
11+
from typing import Generator, Mapping, Optional, Protocol, Tuple
1212
from urllib import parse
1313

1414
import urllib3
1515

16-
from ldclient.impl.datasourcev2 import BasisResult, PollingResult, Update
16+
from ldclient.config import Config
17+
from ldclient.impl.datasystem import BasisResult, Update
1718
from ldclient.impl.datasystem.protocolv2 import (
1819
Basis,
1920
ChangeSet,
@@ -46,7 +47,7 @@
4647
POLLING_ENDPOINT = "/sdk/poll"
4748

4849

49-
CacheEntry = namedtuple("CacheEntry", ["data", "etag"])
50+
PollingResult = _Result[Tuple[ChangeSet, Mapping], str]
5051

5152

5253
class Requester(Protocol): # pylint: disable=too-few-public-methods
@@ -68,6 +69,9 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
6869
raise NotImplementedError
6970

7071

72+
CacheEntry = namedtuple("CacheEntry", ["data", "etag"])
73+
74+
7175
class PollingDataSource:
7276
"""
7377
PollingDataSource is a data source that can retrieve information from
@@ -206,7 +210,7 @@ class Urllib3PollingRequester:
206210
requests.
207211
"""
208212

209-
def __init__(self, config):
213+
def __init__(self, config: Config):
210214
self._etag = None
211215
self._http = _http_factory(config).create_pool_manager(1, config.base_uri)
212216
self._config = config
@@ -335,3 +339,30 @@ def polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]:
335339
)
336340

337341
return _Fail(error="didn't receive any known protocol events in polling payload")
342+
343+
344+
class PollingDataSourceBuilder:
345+
"""
346+
Builder for a PollingDataSource.
347+
"""
348+
349+
def __init__(self, config: Config):
350+
self._config = config
351+
self._requester = None
352+
353+
def requester(self, requester: Requester) -> "PollingDataSourceBuilder":
354+
"""Sets a custom Requester for the PollingDataSource."""
355+
self._requester = requester
356+
return self
357+
358+
def build(self) -> PollingDataSource:
359+
"""Builds the PollingDataSource with the configured parameters."""
360+
requester = (
361+
self._requester
362+
if self._requester is not None
363+
else Urllib3PollingRequester(self._config)
364+
)
365+
366+
return PollingDataSource(
367+
poll_interval=self._config.poll_interval, requester=requester
368+
)

ldclient/impl/datasourcev2/streaming.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from ld_eventsource.errors import HTTPStatusError
2020

2121
from ldclient.config import Config
22-
from ldclient.impl.datasourcev2 import Synchronizer, Update
22+
from ldclient.impl.datasystem import Synchronizer, Update
2323
from ldclient.impl.datasystem.protocolv2 import (
2424
ChangeSetBuilder,
2525
DeleteObject,
@@ -110,7 +110,7 @@ def create_sse_client(config: Config) -> SSEClientImpl:
110110
)
111111

112112

113-
class StreamingSynchronizer(Synchronizer):
113+
class StreamingDataSource(Synchronizer):
114114
"""
115115
StreamingSynchronizer is a specific type of Synchronizer that handles
116116
streaming data sources.
@@ -386,3 +386,17 @@ def __enter__(self):
386386
def __exit__(self, type, value, traceback):
387387
# self.stop()
388388
pass
389+
390+
391+
class StreamingDataSourceBuilder: # disable: pylint: disable=too-few-public-methods
392+
"""
393+
Builder for a StreamingDataSource.
394+
"""
395+
396+
def __init__(self, config: Config):
397+
self._config = config
398+
399+
def build(self) -> StreamingDataSource:
400+
"""Builds a StreamingDataSource instance with the configured parameters."""
401+
# TODO(fdv2): Add in the other controls here.
402+
return StreamingDataSource(self._config)

0 commit comments

Comments
 (0)