-
Notifications
You must be signed in to change notification settings - Fork 45
chore: Support x-ld-envid in updates #370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
383a396
203a033
cc6e742
d3cb296
254009b
7e6977d
38331d6
0fb7c36
a269e0f
94c2334
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,8 @@ | |
| ) | ||
| from ldclient.impl.http import HTTPFactory, _http_factory | ||
| from ldclient.impl.util import ( | ||
| _LD_ENVID_HEADER, | ||
| _LD_FD_FALLBACK_HEADER, | ||
| http_error_message, | ||
| is_http_error_recoverable, | ||
| log | ||
|
|
@@ -58,7 +60,6 @@ | |
|
|
||
| STREAMING_ENDPOINT = "/sdk/stream" | ||
|
|
||
|
|
||
| SseClientBuilder = Callable[[Config, SelectorStore], SSEClient] | ||
|
|
||
|
|
||
|
|
@@ -146,6 +147,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| self._running = True | ||
| self._connection_attempt_start_time = time() | ||
|
|
||
| envid = None | ||
| for action in self._sse.all: | ||
| if isinstance(action, Fault): | ||
| # If the SSE client detects the stream has closed, then it will | ||
|
|
@@ -154,7 +156,9 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| if action.error is None: | ||
| continue | ||
|
|
||
| (update, should_continue) = self._handle_error(action.error) | ||
| envid = action.headers.get(_LD_ENVID_HEADER) if action.headers is not None else None | ||
|
||
|
|
||
| (update, should_continue) = self._handle_error(action.error, envid) | ||
| if update is not None: | ||
| yield update | ||
|
|
||
|
|
@@ -163,20 +167,23 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| continue | ||
|
|
||
| if isinstance(action, Start) and action.headers is not None: | ||
| fallback = action.headers.get('X-LD-FD-Fallback') == 'true' | ||
| fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true' | ||
| envid = action.headers.get(_LD_ENVID_HEADER) | ||
|
|
||
| if fallback: | ||
| self._record_stream_init(True) | ||
| yield Update( | ||
| state=DataSourceState.OFF, | ||
| revert_to_fdv1=True | ||
| revert_to_fdv1=True, | ||
| environment_id=envid, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Start Action Leaves Environment State StaleWhen a |
||
| ) | ||
| break | ||
|
|
||
| if not isinstance(action, Event): | ||
| continue | ||
|
|
||
| try: | ||
| update = self._process_message(action, change_set_builder) | ||
| update = self._process_message(action, change_set_builder, envid) | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if update is not None: | ||
| self._record_stream_init(False) | ||
| self._connection_attempt_start_time = None | ||
|
|
@@ -187,7 +194,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| ) | ||
| self._sse.interrupt() | ||
|
|
||
| (update, should_continue) = self._handle_error(e) | ||
| (update, should_continue) = self._handle_error(e, envid) | ||
| if update is not None: | ||
| yield update | ||
| if not should_continue: | ||
|
|
@@ -204,7 +211,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| DataSourceErrorKind.UNKNOWN, 0, time(), str(e) | ||
| ), | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
|
|
||
| self._sse.close() | ||
|
|
@@ -226,7 +233,7 @@ def _record_stream_init(self, failed: bool): | |
|
|
||
| # pylint: disable=too-many-return-statements | ||
| def _process_message( | ||
| self, msg: Event, change_set_builder: ChangeSetBuilder | ||
| self, msg: Event, change_set_builder: ChangeSetBuilder, envid: Optional[str] | ||
| ) -> Optional[Update]: | ||
| """ | ||
| Processes a single message from the SSE stream and returns an Update | ||
|
|
@@ -247,7 +254,7 @@ def _process_message( | |
| change_set_builder.expect_changes() | ||
| return Update( | ||
| state=DataSourceState.VALID, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
| return None | ||
|
|
||
|
|
@@ -293,13 +300,13 @@ def _process_message( | |
| return Update( | ||
| state=DataSourceState.VALID, | ||
| change_set=change_set, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
|
|
||
| log.info("Unexpected event found in stream: %s", msg.event) | ||
| return None | ||
|
|
||
| def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | ||
| def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optional[Update], bool]: | ||
| """ | ||
| This method handles errors that occur during the streaming process. | ||
|
|
||
|
|
@@ -328,7 +335,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| DataSourceErrorKind.INVALID_DATA, 0, time(), str(error) | ||
| ), | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
| return (update, True) | ||
|
|
||
|
|
@@ -344,11 +351,15 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| str(error), | ||
| ) | ||
|
|
||
| if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true': | ||
| if envid is None and error.headers is not None: | ||
| envid = error.headers.get(_LD_ENVID_HEADER) | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if error.headers is not None and error.headers.get(_LD_FD_FALLBACK_HEADER) == 'true': | ||
| update = Update( | ||
| state=DataSourceState.OFF, | ||
| error=error_info, | ||
| revert_to_fdv1=True | ||
| revert_to_fdv1=True, | ||
| environment_id=envid, | ||
| ) | ||
| return (update, False) | ||
|
|
||
|
|
@@ -364,7 +375,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| ), | ||
| error=error_info, | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
|
|
||
| if not is_recoverable: | ||
|
|
@@ -386,7 +397,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| DataSourceErrorKind.UNKNOWN, 0, time(), str(error) | ||
| ), | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
| # no stacktrace here because, for a typical connection error, it'll | ||
| # just be a lengthy tour of urllib3 internals | ||
|
|
@@ -411,5 +422,4 @@ def __init__(self, config: Config): | |
|
|
||
| def build(self) -> StreamingDataSource: | ||
| """Builds a StreamingDataSource instance with the configured parameters.""" | ||
| # TODO(fdv2): Add in the other controls here. | ||
| return StreamingDataSource(self._config) | ||
Uh oh!
There was an error while loading. Please reload this page.