Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ldclient/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ class DataSystemConfig:
data_store: Optional[FeatureStore] = None
"""The (optional) persistent data store instance."""

# TODO(fdv2): Implement this synchronizer up and hook it up everywhere.
# TODO(fdv2): Remove this when FDv2 is fully launched
fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None
"""An optional fallback synchronizer that will read from FDv1"""

Expand Down
131 changes: 130 additions & 1 deletion ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import urllib3

from ldclient.config import Config
from ldclient.impl.datasource.feature_requester import LATEST_ALL_URI
from ldclient.impl.datasystem import BasisResult, SelectorStore, Update
from ldclient.impl.datasystem.protocolv2 import (
Basis,
Expand All @@ -22,6 +23,8 @@
DeleteObject,
EventName,
IntentCode,
ObjectKind,
Payload,
PutObject,
Selector,
ServerIntent
Expand All @@ -43,6 +46,7 @@
DataSourceErrorKind,
DataSourceState
)
from ldclient.versioned_data_kind import FEATURES, SEGMENTS

POLLING_ENDPOINT = "/sdk/poll"

Expand Down Expand Up @@ -123,6 +127,15 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
),
)

fallback = result.exception.headers.get("X-LD-FD-Fallback") == 'true'
if fallback:
yield Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True
)
break

status_code = result.exception.status
if is_http_error_recoverable(status_code):
# TODO(fdv2): Add support for environment ID
Expand Down Expand Up @@ -158,6 +171,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
state=DataSourceState.VALID,
change_set=change_set,
environment_id=headers.get("X-LD-EnvID"),
revert_to_fdv1=headers.get('X-LD-FD-Fallback') == 'true'
)

if self._event.wait(self._poll_interval):
Expand Down Expand Up @@ -262,7 +276,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:

if response.status >= 400:
return _Fail(
f"HTTP error {response}", UnsuccessfulResponseException(response.status)
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
)

headers = response.headers
Expand Down Expand Up @@ -375,3 +389,118 @@ def build(self) -> PollingDataSource:
return PollingDataSource(
poll_interval=self._config.poll_interval, requester=requester
)


# pylint: disable=too-few-public-methods
class Urllib3FDv1PollingRequester:
"""
Urllib3PollingRequesterFDv1 is a Requester that uses urllib3 to make HTTP
requests.
"""

def __init__(self, config: Config):
self._etag = None
self._http = _http_factory(config).create_pool_manager(1, config.base_uri)
self._config = config
self._poll_uri = config.base_uri + LATEST_ALL_URI

def fetch(self, selector: Optional[Selector]) -> PollingResult:
"""
Fetches the data for the given selector.
Returns a Result containing a tuple of ChangeSet and any request headers,
or an error if the data could not be retrieved.
"""
query_params = {}
if self._config.payload_filter_key is not None:
query_params["filter"] = self._config.payload_filter_key

uri = self._poll_uri
if len(query_params) > 0:
filter_query = parse.urlencode(query_params)
uri += f"?{filter_query}"

hdrs = _headers(self._config)
hdrs["Accept-Encoding"] = "gzip"

if self._etag is not None:
hdrs["If-None-Match"] = self._etag

response = self._http.request(
"GET",
uri,
headers=hdrs,
timeout=urllib3.Timeout(
connect=self._config.http.connect_timeout,
read=self._config.http.read_timeout,
),
retries=1,
)

if response.status >= 400:
return _Fail(
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
)

headers = response.headers

if response.status == 304:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way, when changing data sources, that this etag could be a problem for us?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the our payload in the store doesn't have to correspond to the to this etag if there was some data source in the interim. But I think that would be, at worse, a delay and it would require some inconsistent back-end state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we switch over to the polling mode, the etag is empty, ensuring the first request is going to get the full payload. The header from then is only used within the context of this class, I don't think that's problematic. Additionally, when you fallback to FDv1, you don't recover from that. This only happens if FD triggers the fallback, and that's only if something is pretty wrong.

return _Success(value=(ChangeSetBuilder.no_changes(), headers))

data = json.loads(response.data.decode("UTF-8"))
etag = headers.get("ETag")

if etag is not None:
self._etag = etag

log.debug(
"%s response status:[%d] ETag:[%s]",
uri,
response.status,
etag,
)

changeset_result = fdv1_polling_payload_to_changeset(data)
if isinstance(changeset_result, _Success):
return _Success(value=(changeset_result.value, headers))

return _Fail(
error=changeset_result.error,
exception=changeset_result.exception,
)


# pylint: disable=too-many-branches,too-many-return-statements
def fdv1_polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]:
"""
Converts a fdv1 polling payload into a ChangeSet.
"""
builder = ChangeSetBuilder()
builder.start(IntentCode.TRANSFER_FULL)
selector = Selector.no_selector()

# FDv1 uses "flags" instead of "features", so we need to map accordingly
# Map FDv1 JSON keys to ObjectKind enum values
kind_mappings = [
(ObjectKind.FLAG, "flags"),
(ObjectKind.SEGMENT, "segments")
]

for kind, fdv1_key in kind_mappings:
kind_data = data.get(fdv1_key)
if kind_data is None:
continue
if not isinstance(kind_data, dict):
return _Fail(error=f"Invalid format: {fdv1_key} is not a dictionary")

for key in kind_data:
flag_or_segment = kind_data.get(key)
if flag_or_segment is None or not isinstance(flag_or_segment, dict):
return _Fail(error=f"Invalid format: {key} is not a dictionary")

version = flag_or_segment.get('version')
if version is None:
return _Fail(error=f"Invalid format: {key} does not have a version set")

builder.add_put(kind, key, version, flag_or_segment)

return _Success(builder.finish(selector))
31 changes: 21 additions & 10 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
"""

import json
from abc import abstractmethod
from time import time
from typing import Callable, Generator, Iterable, Optional, Protocol, Tuple
from typing import Callable, Generator, Optional, Tuple
from urllib import parse

from ld_eventsource import SSEClient
from ld_eventsource.actions import Action, Event, Fault
from ld_eventsource.actions import Event, Fault, Start
from ld_eventsource.config import (
ConnectStrategy,
ErrorStrategy,
Expand Down Expand Up @@ -151,6 +150,15 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
break
continue

if isinstance(action, Start) and action.headers is not None:
fallback = action.headers.get('X-LD-FD-Fallback') == 'true'
if fallback:
yield Update(
state=DataSourceState.OFF,
revert_to_fdv1=True
)
break

if not isinstance(action, Event):
continue

Expand Down Expand Up @@ -188,11 +196,6 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
# if update is not None:
# self._record_stream_init(False)

# if self._data_source_update_sink is not None:
# self._data_source_update_sink.update_status(
# DataSourceState.VALID, None
# )

self._sse.close()

def stop(self):
Expand Down Expand Up @@ -288,6 +291,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:

If an update is provided, it should be forward upstream, regardless of
whether or not we are going to retry this failure.

The return should be thought of (update, should_continue)
"""
if not self._running:
return (None, False) # don't retry if we've been deliberately stopped
Expand Down Expand Up @@ -315,12 +320,18 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
str(error),
)

if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true':
update = Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True
)
return (update, False)

http_error_message_result = http_error_message(
error.status, "stream connection"
)

is_recoverable = is_http_error_recoverable(error.status)

update = Update(
state=(
DataSourceState.INTERRUPTED
Expand Down
29 changes: 29 additions & 0 deletions ldclient/impl/datasystem/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ldclient.impl.datasourcev2.polling import (
PollingDataSource,
PollingDataSourceBuilder,
Urllib3FDv1PollingRequester,
Urllib3PollingRequester
)
from ldclient.impl.datasourcev2.streaming import (
Expand Down Expand Up @@ -55,6 +56,17 @@ def synchronizers(
self._secondary_synchronizer = secondary
return self

def fdv1_compatible_synchronizer(
self,
fallback: Builder[Synchronizer]
) -> "ConfigBuilder":
"""
Configures the SDK with a fallback synchronizer that is compatible with
the Flag Delivery v1 API.
"""
self._fdv1_fallback_synchronizer = fallback
return self

def data_store(self, data_store: FeatureStore, store_mode: DataStoreMode) -> "ConfigBuilder":
"""
Sets the data store configuration for the data system.
Expand Down Expand Up @@ -91,6 +103,17 @@ def builder(config: LDConfig) -> PollingDataSource:
return builder


def fdv1_fallback_ds_builder() -> Builder[PollingDataSource]:
def builder(config: LDConfig) -> PollingDataSource:
requester = Urllib3FDv1PollingRequester(config)
polling_ds = PollingDataSourceBuilder(config)
polling_ds.requester(requester)

return polling_ds.build()

return builder


def streaming_ds_builder() -> Builder[StreamingDataSource]:
def builder(config: LDConfig) -> StreamingDataSource:
return StreamingDataSourceBuilder(config).build()
Expand All @@ -114,10 +137,12 @@ def default() -> ConfigBuilder:

polling_builder = polling_ds_builder()
streaming_builder = streaming_ds_builder()
fallback = fdv1_fallback_ds_builder()

builder = ConfigBuilder()
builder.initializers([polling_builder])
builder.synchronizers(streaming_builder, polling_builder)
builder.fdv1_compatible_synchronizer(fallback)

return builder

Expand All @@ -130,9 +155,11 @@ def streaming() -> ConfigBuilder:
"""

streaming_builder = streaming_ds_builder()
fallback = fdv1_fallback_ds_builder()

builder = ConfigBuilder()
builder.synchronizers(streaming_builder)
builder.fdv1_compatible_synchronizer(fallback)

return builder

Expand All @@ -145,9 +172,11 @@ def polling() -> ConfigBuilder:
"""

polling_builder: Builder[Synchronizer] = polling_ds_builder()
fallback = fdv1_fallback_ds_builder()

builder = ConfigBuilder()
builder.synchronizers(polling_builder)
builder.fdv1_compatible_synchronizer(fallback)

return builder

Expand Down
6 changes: 5 additions & 1 deletion ldclient/impl/datasystem/fdv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,13 @@ def _consume_synchronizer_results(
# Update status
self._data_source_status_provider.update_status(update.state, update.error)

# Check if we should revert to FDv1 immediately
if update.revert_to_fdv1:
return True, True

# Check for OFF state indicating permanent failure
if update.state == DataSourceState.OFF:
return True, update.revert_to_fdv1
return True, False

# Check condition periodically
current_status = self._data_source_status_provider.status
Expand Down
Loading