diff --git a/ldclient/impl/integrations/files/file_data_sourcev2.py b/ldclient/impl/integrations/files/file_data_sourcev2.py new file mode 100644 index 00000000..c8e152b7 --- /dev/null +++ b/ldclient/impl/integrations/files/file_data_sourcev2.py @@ -0,0 +1,428 @@ +import json +import os +import threading +import traceback +from queue import Empty, Queue +from typing import Generator + +from ldclient.impl.datasystem import BasisResult, SelectorStore, Update +from ldclient.impl.datasystem.protocolv2 import ( + Basis, + ChangeSetBuilder, + IntentCode, + ObjectKind, + Selector +) +from ldclient.impl.repeating_task import RepeatingTask +from ldclient.impl.util import _Fail, _Success, current_time_millis, log +from ldclient.interfaces import ( + DataSourceErrorInfo, + DataSourceErrorKind, + DataSourceState +) + +have_yaml = False +try: + import yaml + have_yaml = True +except ImportError: + pass + +have_watchdog = False +try: + import watchdog + import watchdog.events + import watchdog.observers + have_watchdog = True +except ImportError: + pass + + +def _sanitize_json_item(item): + if not ('version' in item): + item['version'] = 1 + + +class _FileDataSourceV2: + """ + Internal implementation of both Initializer and Synchronizer protocols for file-based data. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + + This component reads feature flag and segment data from local files and provides them + via the FDv2 protocol interfaces. Each instance implements both Initializer and Synchronizer + protocols: + - As an Initializer: reads files once and returns initial data + - As a Synchronizer: watches for file changes and yields updates + + The files use the same format as the v1 file data source, supporting flags, flagValues, + and segments in JSON or YAML format. + """ + + def __init__(self, paths, poll_interval=1, force_polling=False): + """ + Initialize the file data source. + + :param paths: list of file paths to load (or a single path string) + :param poll_interval: seconds between polling checks when watching files (default: 1) + :param force_polling: force polling even if watchdog is available (default: False) + """ + self._paths = paths if isinstance(paths, list) else [paths] + self._poll_interval = poll_interval + self._force_polling = force_polling + self._closed = False + self._update_queue = Queue() + self._lock = threading.Lock() + self._auto_updater = None + + @property + def name(self) -> str: + """Return the name of this data source.""" + return "FileDataV2" + + def fetch(self, ss: SelectorStore) -> BasisResult: + """ + Implementation of the Initializer.fetch method. + + Reads all configured files once and returns their contents as a Basis. + + :param ss: SelectorStore (not used, as we don't have selectors for file data) + :return: BasisResult containing the file data or an error + """ + try: + with self._lock: + if self._closed: + return _Fail("FileDataV2 source has been closed") + + # Load all files and build changeset + result = self._load_all_to_changeset() + if isinstance(result, _Fail): + return result + + change_set = result.value + + basis = Basis( + change_set=change_set, + persist=False, + environment_id=None + ) + + return _Success(basis) + + except Exception as e: + log.error('Error fetching file data: %s' % repr(e)) + traceback.print_exc() + return _Fail(f"Error fetching file data: {str(e)}") + + def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: + """ + Implementation of the Synchronizer.sync method. + + Yields initial data from files, then continues to watch for file changes + and yield updates when files are modified. + + :param ss: SelectorStore (not used, as we don't have selectors for file data) + :return: Generator yielding Update objects + """ + # First yield initial data + initial_result = self.fetch(ss) + if isinstance(initial_result, _Fail): + yield Update( + state=DataSourceState.OFF, + error=DataSourceErrorInfo( + kind=DataSourceErrorKind.INVALID_DATA, + status_code=0, + time=current_time_millis(), + message=initial_result.error + ) + ) + return + + # Yield the initial successful state + yield Update( + state=DataSourceState.VALID, + change_set=initial_result.value.change_set + ) + + # Start watching for file changes + with self._lock: + if not self._closed: + self._auto_updater = self._start_auto_updater() + + # Continue yielding updates as they arrive + while not self._closed: + try: + # Wait for updates with a timeout to allow checking closed status + try: + update = self._update_queue.get(timeout=1.0) + except Empty: + continue + + if update is None: # Sentinel value for shutdown + break + + yield update + + except Exception as e: + log.error('Error in file data synchronizer: %s' % repr(e)) + traceback.print_exc() + yield Update( + state=DataSourceState.OFF, + error=DataSourceErrorInfo( + kind=DataSourceErrorKind.UNKNOWN, + status_code=0, + time=current_time_millis(), + message=f"Error in file data synchronizer: {str(e)}" + ) + ) + break + + def stop(self): + """Stop the data source and clean up resources.""" + with self._lock: + if self._closed: + return + self._closed = True + + auto_updater = self._auto_updater + self._auto_updater = None + + if auto_updater: + auto_updater.stop() + + # Signal shutdown to sync generator + self._update_queue.put(None) + + def _load_all_to_changeset(self): + """ + Load all files and build a changeset. + + :return: _Result containing ChangeSet or error string + """ + flags_dict = {} + segments_dict = {} + + for path in self._paths: + try: + self._load_file(path, flags_dict, segments_dict) + except Exception as e: + log.error('Unable to load flag data from "%s": %s' % (path, repr(e))) + traceback.print_exc() + return _Fail(f'Unable to load flag data from "{path}": {str(e)}') + + # Build a full transfer changeset + builder = ChangeSetBuilder() + builder.start(IntentCode.TRANSFER_FULL) + + # Add all flags to the changeset + for key, flag_data in flags_dict.items(): + builder.add_put( + ObjectKind.FLAG, + key, + flag_data.get('version', 1), + flag_data + ) + + # Add all segments to the changeset + for key, segment_data in segments_dict.items(): + builder.add_put( + ObjectKind.SEGMENT, + key, + segment_data.get('version', 1), + segment_data + ) + + # Use no_selector since we don't have versioning information from files + change_set = builder.finish(Selector.no_selector()) + + return _Success(change_set) + + def _load_file(self, path, flags_dict, segments_dict): + """ + Load a single file and add its contents to the provided dictionaries. + + :param path: path to the file + :param flags_dict: dictionary to add flags to + :param segments_dict: dictionary to add segments to + """ + content = None + with open(path, 'r') as f: + content = f.read() + parsed = self._parse_content(content) + + for key, flag in parsed.get('flags', {}).items(): + _sanitize_json_item(flag) + self._add_item(flags_dict, 'flags', flag) + + for key, value in parsed.get('flagValues', {}).items(): + self._add_item(flags_dict, 'flags', self._make_flag_with_value(key, value)) + + for key, segment in parsed.get('segments', {}).items(): + _sanitize_json_item(segment) + self._add_item(segments_dict, 'segments', segment) + + def _parse_content(self, content): + """ + Parse file content as JSON or YAML. + + :param content: file content string + :return: parsed dictionary + """ + if have_yaml: + return yaml.safe_load(content) # pyyaml correctly parses JSON too + return json.loads(content) + + def _add_item(self, items_dict, kind_name, item): + """ + Add an item to a dictionary, checking for duplicates. + + :param items_dict: dictionary to add to + :param kind_name: name of the kind (for error messages) + :param item: item to add + """ + key = item.get('key') + if items_dict.get(key) is None: + items_dict[key] = item + else: + raise Exception('In %s, key "%s" was used more than once' % (kind_name, key)) + + def _make_flag_with_value(self, key, value): + """ + Create a simple flag configuration from a key-value pair. + + :param key: flag key + :param value: flag value + :return: flag dictionary + """ + return {'key': key, 'version': 1, 'on': True, 'fallthrough': {'variation': 0}, 'variations': [value]} + + def _start_auto_updater(self): + """ + Start watching files for changes. + + :return: auto-updater instance + """ + resolved_paths = [] + for path in self._paths: + try: + resolved_paths.append(os.path.realpath(path)) + except Exception: + log.warning('Cannot watch for changes to data file "%s" because it is an invalid path' % path) + + if have_watchdog and not self._force_polling: + return _WatchdogAutoUpdaterV2(resolved_paths, self._on_file_change) + else: + return _PollingAutoUpdaterV2(resolved_paths, self._on_file_change, self._poll_interval) + + def _on_file_change(self): + """ + Callback invoked when files change. + + Reloads all files and queues an update. + """ + with self._lock: + if self._closed: + return + + try: + # Reload all files + result = self._load_all_to_changeset() + + if isinstance(result, _Fail): + # Queue an error update + error_update = Update( + state=DataSourceState.INTERRUPTED, + error=DataSourceErrorInfo( + kind=DataSourceErrorKind.INVALID_DATA, + status_code=0, + time=current_time_millis(), + message=result.error + ) + ) + self._update_queue.put(error_update) + else: + # Queue a successful update + update = Update( + state=DataSourceState.VALID, + change_set=result.value + ) + self._update_queue.put(update) + + except Exception as e: + log.error('Error processing file change: %s' % repr(e)) + traceback.print_exc() + error_update = Update( + state=DataSourceState.INTERRUPTED, + error=DataSourceErrorInfo( + kind=DataSourceErrorKind.UNKNOWN, + status_code=0, + time=current_time_millis(), + message=f"Error processing file change: {str(e)}" + ) + ) + self._update_queue.put(error_update) + + +# Watch for changes to data files using the watchdog package. This uses native OS filesystem notifications +# if available for the current platform. +class _WatchdogAutoUpdaterV2: + def __init__(self, resolved_paths, on_change_callback): + watched_files = set(resolved_paths) + + class LDWatchdogHandler(watchdog.events.FileSystemEventHandler): + def on_any_event(self, event): + if event.src_path in watched_files: + on_change_callback() + + dir_paths = set() + for path in resolved_paths: + dir_paths.add(os.path.dirname(path)) + + self._observer = watchdog.observers.Observer() + handler = LDWatchdogHandler() + for path in dir_paths: + self._observer.schedule(handler, path) + self._observer.start() + + def stop(self): + self._observer.stop() + self._observer.join() + + +# Watch for changes to data files by polling their modification times. This is used if auto-update is +# on but the watchdog package is not installed. +class _PollingAutoUpdaterV2: + def __init__(self, resolved_paths, on_change_callback, interval): + self._paths = resolved_paths + self._on_change = on_change_callback + self._file_times = self._check_file_times() + self._timer = RepeatingTask("ldclient.datasource.filev2.poll", interval, interval, self._poll) + self._timer.start() + + def stop(self): + self._timer.stop() + + def _poll(self): + new_times = self._check_file_times() + changed = False + for file_path, file_time in self._file_times.items(): + if new_times.get(file_path) is not None and new_times.get(file_path) != file_time: + changed = True + break + self._file_times = new_times + if changed: + self._on_change() + + def _check_file_times(self): + ret = {} + for path in self._paths: + try: + ret[path] = os.path.getmtime(path) + except Exception: + log.warning("Failed to get modification time for %s. Setting to None", path) + ret[path] = None + return ret diff --git a/ldclient/integrations/__init__.py b/ldclient/integrations/__init__.py index 0f0f0591..0d7ee156 100644 --- a/ldclient/integrations/__init__.py +++ b/ldclient/integrations/__init__.py @@ -3,8 +3,9 @@ other than LaunchDarkly. """ -from typing import Any, Dict, List, Mapping, Optional +from typing import Any, Dict, List, Mapping, Optional, Union +from ldclient.config import Builder from ldclient.feature_store import CacheConfig from ldclient.feature_store_helpers import CachingStoreWrapper from ldclient.impl.integrations.consul.consul_feature_store import ( @@ -17,6 +18,9 @@ _DynamoDBFeatureStoreCore ) from ldclient.impl.integrations.files.file_data_source import _FileDataSource +from ldclient.impl.integrations.files.file_data_sourcev2 import ( + _FileDataSourceV2 +) from ldclient.impl.integrations.redis.redis_big_segment_store import ( _RedisBigSegmentStore ) @@ -248,3 +252,62 @@ def new_data_source(paths: List[str], auto_update: bool = False, poll_interval: :return: an object (actually a lambda) to be stored in the ``update_processor_class`` configuration property """ return lambda config, store, ready: _FileDataSource(store, config.data_source_update_sink, ready, paths, auto_update, poll_interval, force_polling) + + @staticmethod + def new_data_source_v2(paths: List[str], poll_interval: float = 1, force_polling: bool = False) -> Builder[Any]: + """Provides a way to use local files as a source of feature flag state using the FDv2 protocol. + + This type is not stable, and not subject to any backwards + compatibility guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + + This returns a builder that can be used with the FDv2 data system configuration as both an + Initializer and a Synchronizer. When used as an Initializer, it reads files once. When used + as a Synchronizer, it watches for file changes and automatically updates when files are modified. + + To use this component with the FDv2 data system, call ``new_data_source_v2`` and use the returned + builder with the custom data system configuration: + :: + + from ldclient.integrations import Files + from ldclient.impl.datasystem.config import custom + + file_source = Files.new_data_source_v2(paths=['my_flags.json']) + + # Use as initializer only + data_system = custom().initializers([file_source]).build() + config = Config(data_system=data_system) + + # Use as synchronizer only + data_system = custom().synchronizers(file_source).build() + config = Config(data_system=data_system) + + # Use as both initializer and synchronizer + data_system = custom().initializers([file_source]).synchronizers(file_source).build() + config = Config(data_system=data_system) + + This will cause the client not to connect to LaunchDarkly to get feature flags. The + client may still make network connections to send analytics events, unless you have disabled + this in your configuration with ``send_events`` or ``offline``. + + The format of the data files is the same as for the v1 file data source, described in the + SDK Reference Guide on `Reading flags from a file `_. + Note that in order to use YAML, you will need to install the ``pyyaml`` package. + + If the data source encounters any error in any file-- malformed content, a missing file, or a + duplicate key-- it will not load flags from any of the files. + + :param paths: the paths of the source files for loading flag data. These may be absolute paths + or relative to the current working directory. Files will be parsed as JSON unless the ``pyyaml`` + package is installed, in which case YAML is also allowed. + :param poll_interval: (default: 1) the minimum interval, in seconds, between checks for file + modifications when used as a Synchronizer. Only applies if the native file-watching mechanism + from ``watchdog`` is not being used. + :param force_polling: (default: false) True if the data source should implement file watching via + polling the filesystem even if a native mechanism is available. This is mainly for SDK testing. + + :return: a builder function that creates the file data source + """ + return lambda config: _FileDataSourceV2(paths, poll_interval, force_polling) diff --git a/ldclient/testing/integrations/test_file_data_sourcev2.py b/ldclient/testing/integrations/test_file_data_sourcev2.py new file mode 100644 index 00000000..e69b2b93 --- /dev/null +++ b/ldclient/testing/integrations/test_file_data_sourcev2.py @@ -0,0 +1,469 @@ +import json +import os +import tempfile +import threading +import time + +import pytest + +from ldclient.config import Config +from ldclient.impl.datasystem.protocolv2 import ( + IntentCode, + ObjectKind, + Selector +) +from ldclient.impl.util import _Fail, _Success +from ldclient.integrations import Files +from ldclient.interfaces import DataSourceState +from ldclient.testing.mock_components import MockSelectorStore + +have_yaml = False +try: + import yaml + have_yaml = True +except ImportError: + pass + + +all_properties_json = ''' +{ + "flags": { + "flag1": { + "key": "flag1", + "on": true, + "fallthrough": { + "variation": 2 + }, + "variations": [ "fall", "off", "on" ] + } + }, + "flagValues": { + "flag2": "value2" + }, + "segments": { + "seg1": { + "key": "seg1", + "include": ["user1"] + } + } +} +''' + +all_properties_yaml = ''' +--- +flags: + flag1: + key: flag1 + "on": true +flagValues: + flag2: value2 +segments: + seg1: + key: seg1 + include: ["user1"] +''' + +flag_only_json = ''' +{ + "flags": { + "flag1": { + "key": "flag1", + "on": true, + "fallthrough": { + "variation": 2 + }, + "variations": [ "fall", "off", "on" ] + } + } +} +''' + +segment_only_json = ''' +{ + "segments": { + "seg1": { + "key": "seg1", + "include": ["user1"] + } + } +} +''' + +flag_values_only_json = ''' +{ + "flagValues": { + "flag2": "value2" + } +} +''' + + +def make_temp_file(content): + """Create a temporary file with the given content.""" + f, path = tempfile.mkstemp() + os.write(f, content.encode("utf-8")) + os.close(f) + return path + + +def replace_file(path, content): + """Replace the contents of a file.""" + with open(path, 'w') as f: + f.write(content) + + +def test_creates_valid_initializer(): + """Test that FileDataSourceV2 creates a working initializer.""" + path = make_temp_file(all_properties_json) + try: + file_source = Files.new_data_source_v2(paths=[path]) + initializer = file_source(Config(sdk_key="dummy")) + + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Success) + + basis = result.value + assert not basis.persist + assert basis.environment_id is None + assert basis.change_set.intent_code == IntentCode.TRANSFER_FULL + + # Should have 2 flags and 1 segment + changes = basis.change_set.changes + assert len(changes) == 3 + + flag_changes = [c for c in changes if c.kind == ObjectKind.FLAG] + segment_changes = [c for c in changes if c.kind == ObjectKind.SEGMENT] + + assert len(flag_changes) == 2 + assert len(segment_changes) == 1 + + # Check selector is no_selector + assert basis.change_set.selector == Selector.no_selector() + finally: + os.remove(path) + + +def test_initializer_handles_missing_file(): + """Test that initializer returns error for missing file.""" + file_source = Files.new_data_source_v2(paths=['no-such-file.json']) + initializer = file_source(Config(sdk_key="dummy")) + + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Fail) + assert "no-such-file.json" in result.error + + +def test_initializer_handles_invalid_json(): + """Test that initializer returns error for invalid JSON.""" + path = make_temp_file('{"flagValues":{') + try: + file_source = Files.new_data_source_v2(paths=[path]) + initializer = file_source(Config(sdk_key="dummy")) + + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Fail) + assert "Unable to load flag data" in result.error + finally: + os.remove(path) + + +def test_initializer_handles_duplicate_keys(): + """Test that initializer returns error when same key appears in multiple files.""" + path1 = make_temp_file(flag_only_json) + path2 = make_temp_file(flag_only_json) + try: + file_source = Files.new_data_source_v2(paths=[path1, path2]) + initializer = file_source(Config(sdk_key="dummy")) + + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Fail) + assert "was used more than once" in result.error + finally: + os.remove(path1) + os.remove(path2) + + +def test_initializer_loads_multiple_files(): + """Test that initializer can load from multiple files.""" + path1 = make_temp_file(flag_only_json) + path2 = make_temp_file(segment_only_json) + try: + file_source = Files.new_data_source_v2(paths=[path1, path2]) + initializer = file_source(Config(sdk_key="dummy")) + + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Success) + + changes = result.value.change_set.changes + flag_changes = [c for c in changes if c.kind == ObjectKind.FLAG] + segment_changes = [c for c in changes if c.kind == ObjectKind.SEGMENT] + + assert len(flag_changes) == 1 + assert len(segment_changes) == 1 + finally: + os.remove(path1) + os.remove(path2) + + +def test_initializer_loads_yaml(): + """Test that initializer can parse YAML files.""" + if not have_yaml: + pytest.skip("skipping YAML test because pyyaml isn't available") + + path = make_temp_file(all_properties_yaml) + try: + file_source = Files.new_data_source_v2(paths=[path]) + initializer = file_source(Config(sdk_key="dummy")) + + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Success) + + changes = result.value.change_set.changes + assert len(changes) == 3 # 2 flags + 1 segment + finally: + os.remove(path) + + +def test_initializer_handles_flag_values(): + """Test that initializer properly converts flagValues to flags.""" + path = make_temp_file(flag_values_only_json) + try: + file_source = Files.new_data_source_v2(paths=[path]) + initializer = file_source(Config(sdk_key="dummy")) + + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Success) + + changes = result.value.change_set.changes + flag_changes = [c for c in changes if c.kind == ObjectKind.FLAG] + assert len(flag_changes) == 1 + + # Check the flag was created with the expected structure + flag_change = flag_changes[0] + assert flag_change.key == "flag2" + assert flag_change.object['key'] == "flag2" + assert flag_change.object['on'] is True + assert flag_change.object['variations'] == ["value2"] + finally: + os.remove(path) + + +def test_creates_valid_synchronizer(): + """Test that FileDataSourceV2 creates a working synchronizer.""" + path = make_temp_file(all_properties_json) + try: + file_source = Files.new_data_source_v2(paths=[path], force_polling=True, poll_interval=0.1) + synchronizer = file_source(Config(sdk_key="dummy")) + + updates = [] + update_count = 0 + + def collect_updates(): + nonlocal update_count + for update in synchronizer.sync(MockSelectorStore(Selector.no_selector())): + updates.append(update) + update_count += 1 + + if update_count == 1: + # Should get initial state + assert update.state == DataSourceState.VALID + assert update.change_set is not None + assert update.change_set.intent_code == IntentCode.TRANSFER_FULL + assert len(update.change_set.changes) == 3 + synchronizer.stop() + break + + # Start the synchronizer in a thread with timeout to prevent hanging + sync_thread = threading.Thread(target=collect_updates) + sync_thread.start() + + # Wait for the thread to complete with timeout + sync_thread.join(timeout=5) + + # Ensure thread completed successfully + if sync_thread.is_alive(): + synchronizer.stop() + sync_thread.join() + pytest.fail("Synchronizer test timed out after 5 seconds") + + assert len(updates) == 1 + finally: + synchronizer.stop() + os.remove(path) + + +def test_synchronizer_detects_file_changes(): + """Test that synchronizer detects and reports file changes.""" + path = make_temp_file(flag_only_json) + try: + file_source = Files.new_data_source_v2(paths=[path], force_polling=True, poll_interval=0.1) + synchronizer = file_source(Config(sdk_key="dummy")) + + updates = [] + update_event = threading.Event() + + def collect_updates(): + for update in synchronizer.sync(MockSelectorStore(Selector.no_selector())): + updates.append(update) + update_event.set() + + if len(updates) >= 2: + break + + # Start the synchronizer + sync_thread = threading.Thread(target=collect_updates) + sync_thread.start() + + # Wait for initial update + assert update_event.wait(timeout=2), "Did not receive initial update" + assert len(updates) == 1 + assert updates[0].state == DataSourceState.VALID + initial_changes = [c for c in updates[0].change_set.changes if c.kind == ObjectKind.FLAG] + assert len(initial_changes) == 1 + + # Modify the file + update_event.clear() + time.sleep(0.2) # Ensure filesystem timestamp changes + replace_file(path, segment_only_json) + + # Wait for the change to be detected + assert update_event.wait(timeout=2), "Did not receive update after file change" + assert len(updates) == 2 + assert updates[1].state == DataSourceState.VALID + segment_changes = [c for c in updates[1].change_set.changes if c.kind == ObjectKind.SEGMENT] + assert len(segment_changes) == 1 + + synchronizer.stop() + sync_thread.join(timeout=2) + finally: + synchronizer.stop() + os.remove(path) + + +def test_synchronizer_reports_error_on_invalid_file_update(): + """Test that synchronizer reports error when file becomes invalid.""" + path = make_temp_file(flag_only_json) + try: + file_source = Files.new_data_source_v2(paths=[path], force_polling=True, poll_interval=0.1) + synchronizer = file_source(Config(sdk_key="dummy")) + + updates = [] + update_event = threading.Event() + + def collect_updates(): + for update in synchronizer.sync(MockSelectorStore(Selector.no_selector())): + updates.append(update) + update_event.set() + + if len(updates) >= 2: + break + + # Start the synchronizer + sync_thread = threading.Thread(target=collect_updates) + sync_thread.start() + + # Wait for initial update + assert update_event.wait(timeout=2), "Did not receive initial update" + assert len(updates) == 1 + assert updates[0].state == DataSourceState.VALID + + # Make the file invalid + update_event.clear() + time.sleep(0.2) # Ensure filesystem timestamp changes + replace_file(path, '{"invalid json') + + # Wait for the error to be detected + assert update_event.wait(timeout=2), "Did not receive update after file became invalid" + assert len(updates) == 2 + assert updates[1].state == DataSourceState.INTERRUPTED + assert updates[1].error is not None + + synchronizer.stop() + sync_thread.join(timeout=2) + finally: + synchronizer.stop() + os.remove(path) + + +def test_synchronizer_can_be_stopped(): + """Test that synchronizer stops cleanly.""" + path = make_temp_file(all_properties_json) + try: + file_source = Files.new_data_source_v2(paths=[path]) + synchronizer = file_source(Config(sdk_key="dummy")) + + updates = [] + + def collect_updates(): + for update in synchronizer.sync(MockSelectorStore(Selector.no_selector())): + updates.append(update) + + # Start the synchronizer + sync_thread = threading.Thread(target=collect_updates) + sync_thread.start() + + # Give it a moment to process initial data + time.sleep(0.2) + + # Stop it + synchronizer.stop() + + # Thread should complete + sync_thread.join(timeout=2) + assert not sync_thread.is_alive() + + # Should have received at least the initial update + assert len(updates) >= 1 + assert updates[0].state == DataSourceState.VALID + finally: + os.remove(path) + + +def test_fetch_after_stop_returns_error(): + """Test that fetch returns error after synchronizer is stopped.""" + path = make_temp_file(all_properties_json) + try: + file_source = Files.new_data_source_v2(paths=[path]) + initializer = file_source(Config(sdk_key="dummy")) + + # First fetch should work + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Success) + + # Stop the source + initializer.stop() + + # Second fetch should fail + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Fail) + assert "closed" in result.error + finally: + os.remove(path) + + +def test_source_name_property(): + """Test that the data source has the correct name.""" + path = make_temp_file(all_properties_json) + try: + file_source = Files.new_data_source_v2(paths=[path]) + source = file_source(Config(sdk_key="dummy")) + + assert source.name == "FileDataV2" + finally: + source.stop() + os.remove(path) + + +def test_accepts_single_path_string(): + """Test that paths parameter can be a single string.""" + path = make_temp_file(flag_only_json) + try: + # Pass a single string instead of a list + file_source = Files.new_data_source_v2(paths=path) + initializer = file_source(Config(sdk_key="dummy")) + + result = initializer.fetch(MockSelectorStore(Selector.no_selector())) + assert isinstance(result, _Success) + assert len(result.value.change_set.changes) == 1 + finally: + os.remove(path)