-
Notifications
You must be signed in to change notification settings - Fork 45
chore: Add FDv2-compatible contract test support #362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8ccfd3e to
77b1e82
Compare
e55dca5 to
9c3cd49
Compare
| Stops the streaming synchronizer, closing any open connections. | ||
| """ | ||
| log.info("Stopping StreamingUpdateProcessor") | ||
| self._running = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume no threading concerns in this implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe there should be. The data system starts it's own thread which creates and destroys synchronizers synchronously.
<!-- CURSOR_SUMMARY --> > [!NOTE] > Introduces FDv2 data system configuration and robust synchronizer lifecycle (start/stop), integrates SSE streaming with payload filter, implements an in-memory feature store, and updates tests and contract client accordingly. > > - **FDv2/Data System**: > - Add `Synchronizer.stop()` to interface and implement stop/lifecycle management in `StreamingDataSource` and `PollingDataSource`. > - Enhance `FDv2` to track/stop the active synchronizer safely with locks; ensure threads shut down cleanly. > - Add `datasystem.config` builders (`polling_ds_builder`, `streaming_ds_builder`), expose `fdv1_fallback_synchronizer` in config. > - **Streaming**: > - Switch to `ld_eventsource.SSEClient`; include payload filter in stream URI. > - Handle stream errors by interrupting/closing SSE; stop on unrecoverable errors; ensure closure on exit. > - **Polling**: > - Add stoppable sync loop with `_stop` flag and `stop()` method. > - **Store**: > - Implement thread-safe `InMemoryFeatureStore` with basic CRUD, init, and diagnostics; integrate with `Store`. > - **Contract tests**: > - Support FDv2 `dataSystem` config (initializers/synchronizers, payloadFilter) in `client_entity.py`. > - **Tests**: > - Update streaming synchronizer tests for new SSE client usage and stop/interrupt behavior. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit e87daa0. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
<!-- CURSOR_SUMMARY --> > [!NOTE] > Introduces FDv2 data system configuration and robust synchronizer lifecycle (start/stop), integrates SSE streaming with payload filter, implements an in-memory feature store, and updates tests and contract client accordingly. > > - **FDv2/Data System**: > - Add `Synchronizer.stop()` to interface and implement stop/lifecycle management in `StreamingDataSource` and `PollingDataSource`. > - Enhance `FDv2` to track/stop the active synchronizer safely with locks; ensure threads shut down cleanly. > - Add `datasystem.config` builders (`polling_ds_builder`, `streaming_ds_builder`), expose `fdv1_fallback_synchronizer` in config. > - **Streaming**: > - Switch to `ld_eventsource.SSEClient`; include payload filter in stream URI. > - Handle stream errors by interrupting/closing SSE; stop on unrecoverable errors; ensure closure on exit. > - **Polling**: > - Add stoppable sync loop with `_stop` flag and `stop()` method. > - **Store**: > - Implement thread-safe `InMemoryFeatureStore` with basic CRUD, init, and diagnostics; integrate with `Store`. > - **Contract tests**: > - Support FDv2 `dataSystem` config (initializers/synchronizers, payloadFilter) in `client_entity.py`. > - **Tests**: > - Update streaming synchronizer tests for new SSE client usage and stop/interrupt behavior. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit e87daa0. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
Note
Introduces FDv2 data system configuration and robust synchronizer lifecycle (start/stop), integrates SSE streaming with payload filter, implements an in-memory feature store, and updates tests and contract client accordingly.
Synchronizer.stop()to interface and implement stop/lifecycle management inStreamingDataSourceandPollingDataSource.FDv2to track/stop the active synchronizer safely with locks; ensure threads shut down cleanly.datasystem.configbuilders (polling_ds_builder,streaming_ds_builder), exposefdv1_fallback_synchronizerin config.ld_eventsource.SSEClient; include payload filter in stream URI._stopflag andstop()method.InMemoryFeatureStorewith basic CRUD, init, and diagnostics; integrate withStore.dataSystemconfig (initializers/synchronizers, payloadFilter) inclient_entity.py.Written by Cursor Bugbot for commit e87daa0. This will update automatically on new commits. Configure here.