From 4bdc298bf119f56f0456d862e3107f46472e18e2 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 23 Dec 2025 20:08:24 +0000 Subject: [PATCH 1/9] chore: Add the FDv2 data system protocol implementation --- lib/ldclient-rb/config.rb | 62 ++ lib/ldclient-rb/datasystem.rb | 226 ++++++ .../impl/data_source/status_provider.rb | 78 ++ .../feature_store_client_wrapper.rb | 168 ++++ .../data_store/in_memory_feature_store.rb | 130 +++ .../impl/data_store/status_provider.rb | 78 ++ lib/ldclient-rb/impl/data_store/store.rb | 371 +++++++++ lib/ldclient-rb/impl/data_system/fdv2.rb | 476 +++++++++++ .../impl/data_system/protocolv2.rb | 264 ++++++ lib/ldclient-rb/interfaces/data_system.rb | 755 ++++++++++++++++++ 10 files changed, 2608 insertions(+) create mode 100644 lib/ldclient-rb/datasystem.rb create mode 100644 lib/ldclient-rb/impl/data_source/status_provider.rb create mode 100644 lib/ldclient-rb/impl/data_store/feature_store_client_wrapper.rb create mode 100644 lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb create mode 100644 lib/ldclient-rb/impl/data_store/status_provider.rb create mode 100644 lib/ldclient-rb/impl/data_store/store.rb create mode 100644 lib/ldclient-rb/impl/data_system/fdv2.rb create mode 100644 lib/ldclient-rb/impl/data_system/protocolv2.rb create mode 100644 lib/ldclient-rb/interfaces/data_system.rb diff --git a/lib/ldclient-rb/config.rb b/lib/ldclient-rb/config.rb index ef556a05..80e59547 100644 --- a/lib/ldclient-rb/config.rb +++ b/lib/ldclient-rb/config.rb @@ -45,6 +45,7 @@ class Config # @option opts [Hash] :application See {#application} # @option opts [String] :payload_filter_key See {#payload_filter_key} # @option opts [Boolean] :omit_anonymous_contexts See {#omit_anonymous_contexts} + # @option opts [DataSystemConfig] :datasystem_config See {#datasystem_config} # @option hooks [Array LaunchDarkly::Interfaces::DataSystem::Initializer>, nil] Array of builder procs that take Config and return an Initializer + # @param primary_synchronizer [Proc(Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] Builder proc that takes Config and returns the primary Synchronizer + # @param secondary_synchronizer [Proc(Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] Builder proc that takes Config and returns the secondary Synchronizer + # @param data_store_mode [Symbol] The data store mode + # @param data_store [LaunchDarkly::Interfaces::FeatureStore, nil] The (optional) data store + # @param fdv1_fallback_synchronizer [Proc(Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] + # The (optional) builder proc for FDv1-compatible fallback synchronizer + # + def initialize(initializers:, primary_synchronizer:, secondary_synchronizer:, + data_store_mode: LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY, data_store: nil, fdv1_fallback_synchronizer: nil) + @initializers = initializers + @primary_synchronizer = primary_synchronizer + @secondary_synchronizer = secondary_synchronizer + @data_store_mode = data_store_mode + @data_store = data_store + @fdv1_fallback_synchronizer = fdv1_fallback_synchronizer + end + + # The initializers for the data system. Each proc takes sdk_key and Config and returns an Initializer. + # @return [Array LaunchDarkly::Interfaces::DataSystem::Initializer>, nil] + attr_reader :initializers + + # The primary synchronizer builder. Takes sdk_key and Config and returns a Synchronizer. + # @return [Proc(String, Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] + attr_reader :primary_synchronizer + + # The secondary synchronizer builder. Takes sdk_key and Config and returns a Synchronizer. + # @return [Proc(String, Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] + attr_reader :secondary_synchronizer + + # The data store mode. + # @return [Symbol] + attr_reader :data_store_mode + + # The data store. + # @return [LaunchDarkly::Interfaces::FeatureStore, nil] + attr_reader :data_store + + # The FDv1-compatible fallback synchronizer builder. Takes sdk_key and Config and returns a Synchronizer. + # @return [Proc(String, Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] + attr_reader :fdv1_fallback_synchronizer + end end diff --git a/lib/ldclient-rb/datasystem.rb b/lib/ldclient-rb/datasystem.rb new file mode 100644 index 00000000..fef3025f --- /dev/null +++ b/lib/ldclient-rb/datasystem.rb @@ -0,0 +1,226 @@ +# frozen_string_literal: true + +require 'ldclient-rb/interfaces/data_system' +require 'ldclient-rb/config' + +module LaunchDarkly + # + # Configuration for LaunchDarkly's data acquisition strategy. + # + # This module provides factory methods for creating data system configurations. + # + module DataSystem + # + # Builder for the data system configuration. + # + class ConfigBuilder + def initialize + @initializers = nil + @primary_synchronizer = nil + @secondary_synchronizer = nil + @fdv1_fallback_synchronizer = nil + @data_store_mode = LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY + @data_store = nil + end + + # + # Sets the initializers for the data system. + # + # @param initializers [Array LaunchDarkly::Interfaces::DataSystem::Initializer>] + # Array of builder procs that take sdk_key and Config and return an Initializer + # @return [ConfigBuilder] self for chaining + # + def initializers(initializers) + @initializers = initializers + self + end + + # + # Sets the synchronizers for the data system. + # + # @param primary [Proc(String, Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer] Builder proc that takes sdk_key and Config and returns the primary Synchronizer + # @param secondary [Proc(String, Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] + # Builder proc that takes sdk_key and Config and returns the secondary Synchronizer + # @return [ConfigBuilder] self for chaining + # + def synchronizers(primary, secondary = nil) + @primary_synchronizer = primary + @secondary_synchronizer = secondary + self + end + + # + # Configures the SDK with a fallback synchronizer that is compatible with + # the Flag Delivery v1 API. + # + # @param fallback [Proc(String, Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer] + # Builder proc that takes sdk_key and Config and returns the fallback Synchronizer + # @return [ConfigBuilder] self for chaining + # + def fdv1_compatible_synchronizer(fallback) + @fdv1_fallback_synchronizer = fallback + self + end + + # + # Sets the data store configuration for the data system. + # + # @param data_store [LaunchDarkly::Interfaces::FeatureStore] The data store + # @param store_mode [Symbol] The store mode + # @return [ConfigBuilder] self for chaining + # + def data_store(data_store, store_mode) + @data_store = data_store + @data_store_mode = store_mode + self + end + + # + # Builds the data system configuration. + # + # @return [DataSystemConfig] + # @raise [ArgumentError] if configuration is invalid + # + def build + if @secondary_synchronizer && @primary_synchronizer.nil? + raise ArgumentError, "Primary synchronizer must be set if secondary is set" + end + + DataSystemConfig.new( + initializers: @initializers, + primary_synchronizer: @primary_synchronizer, + secondary_synchronizer: @secondary_synchronizer, + data_store_mode: @data_store_mode, + data_store: @data_store, + fdv1_fallback_synchronizer: @fdv1_fallback_synchronizer + ) + end + end + + # @private + def self.polling_ds_builder + # TODO(fdv2): Implement polling data source builder + lambda do |_sdk_key, _config| + raise NotImplementedError, "Polling data source not yet implemented for FDv2" + end + end + + # @private + def self.fdv1_fallback_ds_builder + # TODO(fdv2): Implement FDv1 fallback polling data source builder + lambda do |_sdk_key, _config| + raise NotImplementedError, "FDv1 fallback data source not yet implemented for FDv2" + end + end + + # @private + def self.streaming_ds_builder + # TODO(fdv2): Implement streaming data source builder + lambda do |_sdk_key, _config| + raise NotImplementedError, "Streaming data source not yet implemented for FDv2" + end + end + + # + # Default is LaunchDarkly's recommended flag data acquisition strategy. + # + # Currently, it operates a two-phase method for obtaining data: first, it + # requests data from LaunchDarkly's global CDN. Then, it initiates a + # streaming connection to LaunchDarkly's Flag Delivery services to + # receive real-time updates. + # + # If the streaming connection is interrupted for an extended period of + # time, the SDK will automatically fall back to polling the global CDN + # for updates. + # + # @return [ConfigBuilder] + # + def self.default + polling_builder = polling_ds_builder + streaming_builder = streaming_ds_builder + fallback = fdv1_fallback_ds_builder + + builder = ConfigBuilder.new + builder.initializers([polling_builder]) + builder.synchronizers(streaming_builder, polling_builder) + builder.fdv1_compatible_synchronizer(fallback) + + builder + end + + # + # Streaming configures the SDK to efficiently stream flag/segment data + # in the background, allowing evaluations to operate on the latest data + # with no additional latency. + # + # @return [ConfigBuilder] + # + def self.streaming + streaming_builder = streaming_ds_builder + fallback = fdv1_fallback_ds_builder + + builder = ConfigBuilder.new + builder.synchronizers(streaming_builder) + builder.fdv1_compatible_synchronizer(fallback) + + builder + end + + # + # Polling configures the SDK to regularly poll an endpoint for + # flag/segment data in the background. This is less efficient than + # streaming, but may be necessary in some network environments. + # + # @return [ConfigBuilder] + # + def self.polling + polling_builder = polling_ds_builder + fallback = fdv1_fallback_ds_builder + + builder = ConfigBuilder.new + builder.synchronizers(polling_builder) + builder.fdv1_compatible_synchronizer(fallback) + + builder + end + + # + # Custom returns a builder suitable for creating a custom data + # acquisition strategy. You may configure how the SDK uses a Persistent + # Store, how the SDK obtains an initial set of data, and how the SDK + # keeps data up-to-date. + # + # @return [ConfigBuilder] + # + def self.custom + ConfigBuilder.new + end + + # + # Daemon configures the SDK to read from a persistent store integration + # that is populated by Relay Proxy or other SDKs. The SDK will not connect + # to LaunchDarkly. In this mode, the SDK never writes to the data store. + # + # @param store [Object] The persistent store + # @return [ConfigBuilder] + # + def self.daemon(store) + default.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY) + end + + # + # PersistentStore is similar to default, with the addition of a persistent + # store integration. Before data has arrived from LaunchDarkly, the SDK is + # able to evaluate flags using data from the persistent store. Once fresh + # data is available, the SDK will no longer read from the persistent store, + # although it will keep it up-to-date. + # + # @param store [Object] The persistent store + # @return [ConfigBuilder] + # + def self.persistent_store(store) + default.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_WRITE) + end + end +end + diff --git a/lib/ldclient-rb/impl/data_source/status_provider.rb b/lib/ldclient-rb/impl/data_source/status_provider.rb new file mode 100644 index 00000000..087e15c0 --- /dev/null +++ b/lib/ldclient-rb/impl/data_source/status_provider.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +require "concurrent" +require "forwardable" +require "ldclient-rb/interfaces" + +module LaunchDarkly + module Impl + module DataSource + # + # Provides status tracking and listener management for data sources. + # + # This class implements the {LaunchDarkly::Interfaces::DataSource::StatusProvider} interface. + # It maintains the current status of the data source and broadcasts status changes to listeners. + # + class StatusProviderV2 + include LaunchDarkly::Interfaces::DataSource::StatusProvider + + extend Forwardable + def_delegators :@status_broadcaster, :add_listener, :remove_listener + + # + # Creates a new status provider. + # + # @param status_broadcaster [LaunchDarkly::Impl::Broadcaster] Broadcaster for status changes + # + def initialize(status_broadcaster) + @status_broadcaster = status_broadcaster + @status = LaunchDarkly::Interfaces::DataSource::Status.new( + LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING, + Time.now, + nil + ) + @lock = Concurrent::ReadWriteLock.new + end + + # (see LaunchDarkly::Interfaces::DataSource::StatusProvider#status) + def status + @lock.with_read_lock do + @status + end + end + + # (see LaunchDarkly::Interfaces::DataSource::UpdateSink#update_status) + def update_status(new_state, new_error) + status_to_broadcast = nil + + @lock.with_write_lock do + old_status = @status + + # Special handling: INTERRUPTED during INITIALIZING stays INITIALIZING + if new_state == LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED && + old_status.state == LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING + new_state = LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING + end + + # No change if state is the same and no error + return if new_state == old_status.state && new_error.nil? + + new_since = new_state == old_status.state ? @status.state_since : Time.now + new_error = @status.last_error if new_error.nil? + + @status = LaunchDarkly::Interfaces::DataSource::Status.new( + new_state, + new_since, + new_error + ) + + status_to_broadcast = @status + end + + @status_broadcaster.broadcast(status_to_broadcast) if status_to_broadcast + end + end + end + end +end + diff --git a/lib/ldclient-rb/impl/data_store/feature_store_client_wrapper.rb b/lib/ldclient-rb/impl/data_store/feature_store_client_wrapper.rb new file mode 100644 index 00000000..ff1cdeb4 --- /dev/null +++ b/lib/ldclient-rb/impl/data_store/feature_store_client_wrapper.rb @@ -0,0 +1,168 @@ +# frozen_string_literal: true + +require "concurrent" +require "ldclient-rb/interfaces" +require "ldclient-rb/impl/store_data_set_sorter" +require "ldclient-rb/impl/repeating_task" + +module LaunchDarkly + module Impl + module DataStore + # + # Provides additional behavior that the client requires before or after feature store operations. + # Currently this just means sorting the data set for init() and dealing with data store status listeners. + # + class FeatureStoreClientWrapperV2 + include LaunchDarkly::Interfaces::FeatureStore + + # + # Initialize the wrapper. + # + # @param store [LaunchDarkly::Interfaces::FeatureStore] The underlying feature store + # @param store_update_sink [LaunchDarkly::Impl::DataStore::StatusProviderV2] The status provider for updates + # @param logger [Logger] The logger instance + # + def initialize(store, store_update_sink, logger) + @store = store + @store_update_sink = store_update_sink + @logger = logger + @monitoring_enabled = does_store_support_monitoring? + + # Thread synchronization + @mutex = Mutex.new + @last_available = true + @poller = nil + end + + # (see LaunchDarkly::Interfaces::FeatureStore#init) + def init(all_data) + wrapper { @store.init(FeatureStoreDataSetSorter.sort_all_collections(all_data)) } + end + + # (see LaunchDarkly::Interfaces::FeatureStore#get) + def get(kind, key) + wrapper { @store.get(kind, key) } + end + + # (see LaunchDarkly::Interfaces::FeatureStore#all) + def all(kind) + wrapper { @store.all(kind) } + end + + # (see LaunchDarkly::Interfaces::FeatureStore#delete) + def delete(kind, key, version) + wrapper { @store.delete(kind, key, version) } + end + + # (see LaunchDarkly::Interfaces::FeatureStore#upsert) + def upsert(kind, item) + wrapper { @store.upsert(kind, item) } + end + + # (see LaunchDarkly::Interfaces::FeatureStore#initialized?) + def initialized? + @store.initialized? + end + + # + # Returns whether monitoring is enabled. + # + # @return [Boolean] + # + def monitoring_enabled? + @monitoring_enabled + end + + # + # Wraps store operations with exception handling and availability tracking. + # + # @yield The block to execute + # @return [Object] The result of the block + # + private def wrapper + begin + yield + rescue StandardError + update_availability(false) if @monitoring_enabled + raise + end + end + + # + # Updates the availability status of the store. + # + # @param available [Boolean] Whether the store is available + # @return [void] + # + private def update_availability(available) + @mutex.synchronize do + return if available == @last_available + @last_available = available + end + + if available + @logger.warn { "[LDClient] Persistent store is available again" } + end + + status = LaunchDarkly::Interfaces::DataStore::Status.new(available, true) + @store_update_sink.update_status(status) + + if available + @mutex.synchronize do + return if @poller.nil? + + @poller.stop + @poller = nil + end + + return + end + + @logger.warn { "[LDClient] Detected persistent store unavailability; updates will be cached until it recovers" } + + task = LaunchDarkly::Impl::RepeatingTask.new(0.5, 0, method(:check_availability), @logger, "LDClient/DataStoreWrapperV2#check-availability") + + @mutex.synchronize do + @poller = task + @poller.start + end + end + + # + # Checks if the store is available. + # + # @return [void] + # + private def check_availability + begin + update_availability(true) if @store.available? + rescue => e + @logger.error { "[LDClient] Unexpected error from data store status function: #{e.message}" } + end + end + + # + # Determines whether the wrapped store can support enabling monitoring. + # + # The wrapped store must provide a monitoring_enabled? method, which must + # be true. But this alone is not sufficient. + # + # Because this class wraps all interactions with a provided store, it can + # technically "monitor" any store. However, monitoring also requires that + # we notify listeners when the store is available again. + # + # We determine this by checking the store's available? method, so this + # is also a requirement for monitoring support. + # + # @return [Boolean] + # + private def does_store_support_monitoring? + return false unless @store.respond_to?(:monitoring_enabled?) + return false unless @store.respond_to?(:available?) + + @store.monitoring_enabled? + end + end + end + end +end diff --git a/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb b/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb new file mode 100644 index 00000000..5512f8fa --- /dev/null +++ b/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +require "concurrent" +require "concurrent/atomics" +require "ldclient-rb/impl/data_store" +require "ldclient-rb/interfaces/data_system" + +module LaunchDarkly + module Impl + module DataStore + # + # InMemoryFeatureStoreV2 is a read-only in-memory store implementation for FDv2. + # + class InMemoryFeatureStoreV2 + include LaunchDarkly::Interfaces::DataSystem::ReadOnlyStore + def initialize + @lock = Concurrent::ReadWriteLock.new + @initialized = Concurrent::AtomicBoolean.new(false) + @items = {} + end + + # + # (see LaunchDarkly::Interfaces::DataSystem::ReadOnlyStore#get) + # + def get(kind, key) + @lock.with_read_lock do + items_of_kind = @items[kind] + return nil if items_of_kind.nil? + + item = items_of_kind[key.to_sym] + return nil if item.nil? + return nil if item[:deleted] + + item + end + end + + # + # (see LaunchDarkly::Interfaces::DataSystem::ReadOnlyStore#all) + # + def all(kind) + @lock.with_read_lock do + items_of_kind = @items[kind] + return {} if items_of_kind.nil? + + items_of_kind.select { |_k, item| !item[:deleted] } + end + end + + # + # (see LaunchDarkly::Interfaces::DataSystem::ReadOnlyStore#initialized?) + # + def initialized? + @initialized.value + end + + # + # Initializes the store with a full set of data, replacing any existing data. + # + # @param collections [Hash>] Hash of data kinds to collections of items + # @return [Boolean] true if successful, false otherwise + # + def set_basis(collections) + all_decoded = decode_collection(collections) + return false if all_decoded.nil? + + @lock.with_write_lock do + @items.clear + @items.update(all_decoded) + @initialized.make_true + end + + true + rescue => e + LaunchDarkly::Impl.log.error { "[LDClient] Failed applying set_basis: #{e.message}" } + false + end + + # + # Applies a delta update to the store. + # + # @param collections [Hash>] Hash of data kinds to collections with updates + # @return [Boolean] true if successful, false otherwise + # + def apply_delta(collections) + all_decoded = decode_collection(collections) + return false if all_decoded.nil? + + @lock.with_write_lock do + all_decoded.each do |kind, kind_data| + items_of_kind = @items[kind] ||= {} + kind_data.each do |key, item| + items_of_kind[key.to_sym] = item + end + end + end + + true + rescue => e + LaunchDarkly::Impl.log.error { "[LDClient] Failed applying apply_delta: #{e.message}" } + false + end + + # + # Decodes a collection of items. + # + # @param collections [Hash>] Hash of data kinds to collections + # @return [Hash>, nil] Decoded collection with symbol keys, or nil on error + # + private def decode_collection(collections) + all_decoded = {} + + collections.each do |kind, collection| + items_decoded = {} + collection.each do |key, item| + # Items are already in decoded format for FDv2 + items_decoded[key.to_sym] = item + end + all_decoded[kind] = items_decoded + end + + all_decoded + rescue => e + LaunchDarkly::Impl.log.error { "[LDClient] Failed decoding collection: #{e.message}" } + nil + end + end + end + end +end diff --git a/lib/ldclient-rb/impl/data_store/status_provider.rb b/lib/ldclient-rb/impl/data_store/status_provider.rb new file mode 100644 index 00000000..506b26e3 --- /dev/null +++ b/lib/ldclient-rb/impl/data_store/status_provider.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +require "concurrent" +require "ldclient-rb/interfaces" + +module LaunchDarkly + module Impl + module DataStore + # + # StatusProviderV2 is the FDv2-specific implementation of {LaunchDarkly::Interfaces::DataStore::StatusProvider}. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + class StatusProviderV2 + include LaunchDarkly::Interfaces::DataStore::StatusProvider + + # + # Initialize the status provider. + # + # @param store [Object, nil] The feature store (may be nil for in-memory only) + # @param listeners [LaunchDarkly::Impl::Broadcaster] Broadcaster for status changes + # + def initialize(store, listeners) + @store = store + @listeners = listeners + @lock = Concurrent::ReadWriteLock.new + @status = LaunchDarkly::Interfaces::DataStore::Status.new(true, false) + end + + # (see LaunchDarkly::Interfaces::DataStore::UpdateSink#update_status) + def update_status(status) + modified = false + + @lock.with_write_lock do + if @status.available != status.available || @status.stale != status.stale + @status = status + modified = true + end + end + + @listeners.broadcast(status) if modified + end + + # (see LaunchDarkly::Interfaces::DataStore::StatusProvider#status) + def status + @lock.with_read_lock do + LaunchDarkly::Interfaces::DataStore::Status.new(@status.available, @status.stale) + end + end + + # (see LaunchDarkly::Interfaces::DataStore::StatusProvider#monitoring_enabled?) + def monitoring_enabled? + return false if @store.nil? + return false unless @store.respond_to?(:monitoring_enabled?) + + @store.monitoring_enabled? + end + + # (see LaunchDarkly::Interfaces::DataStore::StatusProvider#add_listener) + def add_listener(listener) + @listeners.add(listener) + end + + # (see LaunchDarkly::Interfaces::DataStore::StatusProvider#remove_listener) + def remove_listener(listener) + @listeners.remove(listener) + end + end + end + end +end + + + diff --git a/lib/ldclient-rb/impl/data_store/store.rb b/lib/ldclient-rb/impl/data_store/store.rb new file mode 100644 index 00000000..82b2e4dd --- /dev/null +++ b/lib/ldclient-rb/impl/data_store/store.rb @@ -0,0 +1,371 @@ +# frozen_string_literal: true + +require "concurrent" +require "set" +require "ldclient-rb/impl/data_store" +require "ldclient-rb/impl/data_store/in_memory_feature_store" +require "ldclient-rb/impl/dependency_tracker" +require "ldclient-rb/interfaces/data_system" + +module LaunchDarkly + module Impl + module DataStore + # + # Store is a dual-mode persistent/in-memory store that serves requests for + # data from the evaluation algorithm. + # + # At any given moment one of two stores is active: in-memory, or persistent. + # Once the in-memory store has data (either from initializers or a + # synchronizer), the persistent store is no longer read from. From that point + # forward, calls to get data will serve from the memory store. + # + class Store + include LaunchDarkly::Interfaces::DataSystem::SelectorStore + + # + # Initialize a new Store. + # + # @param flag_change_broadcaster [LaunchDarkly::Impl::Broadcaster] Broadcaster for flag change events + # @param change_set_broadcaster [LaunchDarkly::Impl::Broadcaster] Broadcaster for changeset events + # @param logger [Logger] The logger instance + # + def initialize(flag_change_broadcaster, change_set_broadcaster, logger) + @logger = logger + @persistent_store = nil + @persistent_store_status_provider = nil + @persistent_store_writable = false + + # Source of truth for flag evaluations once initialized + @memory_store = InMemoryFeatureStoreV2.new + + # Used to track dependencies between items in the store + @dependency_tracker = LaunchDarkly::Impl::DependencyTracker.new + + # Broadcasters for events + @flag_change_broadcaster = flag_change_broadcaster + @change_set_broadcaster = change_set_broadcaster + + # True if the data in the memory store may be persisted to the persistent store + @persist = false + + # Points to the active store. Swapped upon initialization. + @active_store = @memory_store + + # Identifies the current data + @selector = LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + + # Thread synchronization + @lock = Mutex.new + end + + # + # Configure the store with a persistent store for read-only or read-write access. + # + # @param persistent_store [LaunchDarkly::Interfaces::FeatureStore] The persistent store implementation + # @param writable [Boolean] Whether the persistent store should be written to + # @param status_provider [LaunchDarkly::Impl::DataStore::StatusProviderV2, nil] Optional status provider for the persistent store + # @return [Store] self for method chaining + # + def with_persistence(persistent_store, writable, status_provider = nil) + @lock.synchronize do + @persistent_store = persistent_store + @persistent_store_writable = writable + @persistent_store_status_provider = status_provider + + # Initially use persistent store as active until memory store has data + @active_store = persistent_store + end + + self + end + + # (see LaunchDarkly::Interfaces::DataSystem::SelectorStore#selector) + def selector + @lock.synchronize do + @selector + end + end + + # + # Close the store and any persistent store if configured. + # + # @return [Exception, nil] Exception if close failed, nil otherwise + # + def close + @lock.synchronize do + return nil if @persistent_store.nil? + + begin + @persistent_store.stop if @persistent_store.respond_to?(:stop) + rescue => e + return e + end + end + + nil + end + + # + # Apply a changeset to the store. + # + # @param change_set [LaunchDarkly::Interfaces::DataSystem::ChangeSet] The changeset to apply + # @param persist [Boolean] Whether the changes should be persisted to the persistent store + # @return [void] + # + def apply(change_set, persist) + collections = changes_to_store_data(change_set.changes) + + @lock.synchronize do + begin + case change_set.intent_code + when LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL + set_basis(collections, change_set.selector, persist) + when LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES + apply_delta(collections, change_set.selector, persist) + when LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_NONE + # No-op, no changes to apply + return + end + + # Notify changeset listeners + @change_set_broadcaster.broadcast(change_set) + rescue => e + @logger.error { "[LDClient] Couldn't apply changeset: #{e.message}" } + end + end + end + + # + # Commit persists the data in the memory store to the persistent store, if configured. + # + # @return [Exception, nil] Exception if commit failed, nil otherwise + # + def commit + @lock.synchronize do + return nil unless should_persist? + + begin + # Get all data from memory store and write to persistent store + all_data = {} + [FEATURES, SEGMENTS].each do |kind| + all_data[kind] = @memory_store.all(kind) + end + @persistent_store.init(all_data) + rescue => e + return e + end + end + + nil + end + + # + # Get the currently active store for reading data. + # + # @return [LaunchDarkly::Interfaces::FeatureStore] The active store (memory or persistent) + # + def get_active_store + @lock.synchronize do + @active_store + end + end + + # + # Check if the active store is initialized. + # + # @return [Boolean] + # + def initialized? + get_active_store.initialized? + end + + # + # Get the data store status provider for the persistent store, if configured. + # + # @return [LaunchDarkly::Impl::DataStore::StatusProviderV2, nil] The data store status provider for the persistent store, if configured + # + def get_data_store_status_provider + @lock.synchronize do + @persistent_store_status_provider + end + end + + # + # Set the basis of the store. Any existing data is discarded. + # + # @param collections [Hash{Object => Hash{String => Hash}}] Hash of data kinds to collections of items + # @param selector [LaunchDarkly::Interfaces::DataSystem::Selector, nil] The selector + # @param persist [Boolean] Whether to persist the data + # @return [void] + # + private def set_basis(collections, selector, persist) + # Take snapshot for change detection if we have flag listeners + old_data = nil + if @flag_change_broadcaster.has_listeners? + old_data = {} + [FEATURES, SEGMENTS].each do |kind| + old_data[kind] = @memory_store.all(kind) + end + end + + ok = @memory_store.set_basis(collections) + return unless ok + + # Update dependency tracker + reset_dependency_tracker(collections) + + # Update state + @persist = persist + @selector = selector || LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + + # Switch to memory store as active + @active_store = @memory_store + + # Persist to persistent store if configured and writable + @persistent_store.init(collections) if should_persist? + + # Send change events if we had listeners + if old_data + affected_items = compute_changed_items_for_full_data_set(old_data, collections) + send_change_events(affected_items) + end + end + + # + # Apply a delta update to the store. + # + # @param collections [Hash{Object => Hash{String => Hash}}] Hash of data kinds to collections with updates + # @param selector [LaunchDarkly::Interfaces::DataSystem::Selector, nil] The selector + # @param persist [Boolean] Whether to persist the changes + # @return [void] + # + private def apply_delta(collections, selector, persist) + ok = @memory_store.apply_delta(collections) + return unless ok + + has_listeners = @flag_change_broadcaster.has_listeners? + affected_items = Set.new + + collections.each do |kind, collection| + collection.each do |key, item| + @dependency_tracker.update_dependencies_from(kind, key, item) + if has_listeners + @dependency_tracker.add_affected_items(affected_items, { kind: kind, key: key }) + end + end + end + + # Update state + @persist = persist + @selector = selector || LaunchDarkly::Interfaces::DataSystem::Selector.no_selector + + if should_persist? + collections.each do |kind, kind_data| + kind_data.each do |_key, item| + @persistent_store.upsert(kind, item) + end + end + end + + # Send change events + send_change_events(affected_items) unless affected_items.empty? + end + + # + # Returns whether data should be persisted to the persistent store. + # + # @return [Boolean] + # + private def should_persist? + @persist && !@persistent_store.nil? && @persistent_store_writable + end + + # + # Convert a list of Changes to the pre-existing format used by FeatureStore. + # + # @param changes [Array] List of changes + # @return [Hash{Object => Hash{String => Hash}}] Hash suitable for FeatureStore operations + # + private def changes_to_store_data(changes) + all_data = { + FEATURES => {}, + SEGMENTS => {}, + } + + changes.each do |change| + kind = change.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG ? FEATURES : SEGMENTS + if change.action == LaunchDarkly::Interfaces::DataSystem::ChangeType::PUT && !change.object.nil? + all_data[kind][change.key] = change.object + elsif change.action == LaunchDarkly::Interfaces::DataSystem::ChangeType::DELETE + all_data[kind][change.key] = { key: change.key, deleted: true, version: change.version } + end + end + + all_data + end + + # + # Reset dependency tracker with new full data set. + # + # @param all_data [Hash{Object => Hash{String => Hash}}] Hash of data kinds to items + # @return [void] + # + private def reset_dependency_tracker(all_data) + @dependency_tracker.reset + all_data.each do |kind, items| + items.each do |key, item| + @dependency_tracker.update_dependencies_from(kind, key, item) + end + end + end + + # + # Send flag change events for affected items. + # + # @param affected_items [Set] Set of {kind:, key:} hashes + # @return [void] + # + private def send_change_events(affected_items) + affected_items.each do |item| + if item[:kind] == FEATURES + @flag_change_broadcaster.broadcast(item[:key]) + end + end + end + + # + # Compute which items changed between old and new data sets. + # + # @param old_data [Hash{Object => Hash{String => Hash}}] Old data hash + # @param new_data [Hash{Object => Hash{String => Hash}}] New data hash + # @return [Set] Set of {kind:, key:} hashes + # + private def compute_changed_items_for_full_data_set(old_data, new_data) + affected_items = Set.new + + [FEATURES, SEGMENTS].each do |kind| + old_items = old_data[kind] || {} + new_items = new_data[kind] || {} + + # Get all keys from both old and new data + all_keys = Set.new(old_items.keys) | Set.new(new_items.keys) + + all_keys.each do |key| + old_item = old_items[key] + new_item = new_items[key] + + # If either is missing or versions differ, it's a change + if old_item.nil? || new_item.nil? + @dependency_tracker.add_affected_items(affected_items, { kind: kind, key: key }) + elsif old_item[:version] != new_item[:version] + @dependency_tracker.add_affected_items(affected_items, { kind: kind, key: key }) + end + end + end + + affected_items + end + end + end + end +end diff --git a/lib/ldclient-rb/impl/data_system/fdv2.rb b/lib/ldclient-rb/impl/data_system/fdv2.rb new file mode 100644 index 00000000..57e91fb0 --- /dev/null +++ b/lib/ldclient-rb/impl/data_system/fdv2.rb @@ -0,0 +1,476 @@ +# frozen_string_literal: true + +require "concurrent" +require "ldclient-rb/config" +require "ldclient-rb/impl/data_system" +require "ldclient-rb/impl/data_store/store" +require "ldclient-rb/impl/data_store/feature_store_client_wrapper" +require "ldclient-rb/impl/data_source/status_provider" +require "ldclient-rb/impl/data_store/status_provider" +require "ldclient-rb/impl/broadcaster" +require "ldclient-rb/impl/repeating_task" +require "ldclient-rb/interfaces/data_system" + +module LaunchDarkly + module Impl + module DataSystem + # FDv2 is an implementation of the DataSystem interface that uses the Flag Delivery V2 protocol + # for obtaining and keeping data up-to-date. Additionally, it operates with an optional persistent + # store in read-only or read/write mode. + class FDv2 + include LaunchDarkly::Impl::DataSystem + + # Initialize a new FDv2 data system. + # + # @param sdk_key [String] The SDK key + # @param config [LaunchDarkly::Config] Configuration for initializers and synchronizers + # @param data_system_config [LaunchDarkly::DataSystemConfig] FDv2 data system configuration + def initialize(sdk_key, config, data_system_config) + @sdk_key = sdk_key + @config = config + @data_system_config = data_system_config + @logger = config.logger + @primary_synchronizer_builder = data_system_config.primary_synchronizer + @secondary_synchronizer_builder = data_system_config.secondary_synchronizer + @fdv1_fallback_synchronizer_builder = data_system_config.fdv1_fallback_synchronizer + @disabled = @config.offline? + + # Diagnostic accumulator provided by client for streaming metrics + @diagnostic_accumulator = nil + + # Shared executor for all broadcasters + @shared_executor = Concurrent::SingleThreadExecutor.new + + # Set up event listeners + @flag_change_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @logger) + @change_set_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @logger) + @data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @logger) + @data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @logger) + + recovery_listener = Object.new + recovery_listener.define_singleton_method(:update) do |data_store_status| + persistent_store_outage_recovery(data_store_status) + end + @data_store_broadcaster.add_listener(recovery_listener) + + # Create the store + @store = LaunchDarkly::Impl::DataStore::Store.new(@flag_change_broadcaster, @change_set_broadcaster, @logger) + + # Status providers + @data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProviderV2.new( + @data_source_broadcaster + ) + @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProviderV2.new(nil, @data_store_broadcaster) + + # Configure persistent store if provided + if @data_system_config.data_store + @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProviderV2.new( + @data_system_config.data_store, + @data_store_broadcaster + ) + writable = @data_system_config.data_store_mode == :read_write + wrapper = LaunchDarkly::Impl::DataStore::FeatureStoreClientWrapperV2.new( + @data_system_config.data_store, + @data_store_status_provider, + @logger + ) + @store.with_persistence(wrapper, writable, @data_store_status_provider) + end + + # Threading + @stop_event = Concurrent::Event.new + @ready_event = Concurrent::Event.new + @lock = Mutex.new + @active_synchronizer = nil + @threads = [] + + # Track configuration + @configured_with_data_sources = (@data_system_config.initializers && !@data_system_config.initializers.empty?) || + !@data_system_config.primary_synchronizer.nil? + end + + # (see DataSystem#start) + def start + return @ready_event if @disabled + + @logger.warn { "[LDClient] Data system is disabled, SDK will return application-defined default values" } if @disabled + + if @disabled + @ready_event.set + return @ready_event + end + + @stop_event.reset + @ready_event.reset + + # Start the main coordination thread + main_thread = Thread.new { run_main_loop } + main_thread.name = "FDv2-main" + @threads << main_thread + + @ready_event + end + + # (see DataSystem#stop) + def stop + @stop_event.set + + @lock.synchronize do + if @active_synchronizer + begin + @active_synchronizer.stop + rescue => e + @logger.error { "[LDClient] Error stopping active data source: #{e.message}" } + end + end + end + + # Wait for all threads to complete + @threads.each do |thread| + next unless thread.alive? + + thread.join(5.0) # 5 second timeout + @logger.warn { "[LDClient] Thread #{thread.name} did not terminate in time" } if thread.alive? + end + + # Close the store + @store.close + + # Shutdown the shared executor + @shared_executor.shutdown + end + + # (see DataSystem#set_diagnostic_accumulator) + def set_diagnostic_accumulator(diagnostic_accumulator) + @diagnostic_accumulator = diagnostic_accumulator + end + + # (see DataSystem#store) + def store + @store.get_active_store + end + + # (see DataSystem#data_source_status_provider) + def data_source_status_provider + @data_source_status_provider + end + + # (see DataSystem#data_store_status_provider) + def data_store_status_provider + @data_store_status_provider + end + + # (see DataSystem#flag_change_broadcaster) + def flag_change_broadcaster + @flag_change_broadcaster + end + + # (see DataSystem#data_availability) + def data_availability + return DataAvailability::REFRESHED if @store.selector.defined? + return DataAvailability::CACHED if !@configured_with_data_sources || @store.initialized? + + DataAvailability::DEFAULTS + end + + # (see DataSystem#target_availability) + def target_availability + return DataAvailability::REFRESHED if @configured_with_data_sources + + DataAvailability::CACHED + end + + private + + # + # Main coordination loop that manages initializers and synchronizers. + # + # @return [void] + # + def run_main_loop + begin + @data_source_status_provider.update_status( + LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING, + nil + ) + + # Run initializers first + run_initializers + + # Run synchronizers + run_synchronizers + rescue => e + @logger.error { "[LDClient] Error in FDv2 main loop: #{e.message}" } + @ready_event.set + end + end + + # + # Run initializers to get initial data. + # + # @return [void] + # + def run_initializers + return unless @data_system_config.initializers + + @data_system_config.initializers.each do |initializer_builder| + return if @stop_event.set? + + begin + initializer = initializer_builder.call(@sdk_key, @config) + @logger.info { "[LDClient] Attempting to initialize via #{initializer.name}" } + + basis_result = initializer.fetch(@store) + + if basis_result.success? + basis = basis_result.value + @logger.info { "[LDClient] Initialized via #{initializer.name}" } + + # Apply the basis to the store + @store.apply(basis.change_set, basis.persist) + + # Set ready event if and only if a selector is defined for the changeset + if basis.change_set.selector && basis.change_set.selector.defined? + @ready_event.set + return + end + else + @logger.warn { "[LDClient] Initializer #{initializer.name} failed: #{basis_result.error}" } + end + rescue => e + @logger.error { "[LDClient] Initializer failed with exception: #{e.message}" } + end + end + end + + # + # Run synchronizers to keep data up-to-date. + # + # @return [void] + # + def run_synchronizers + # If no primary synchronizer configured, just set ready and return + if @primary_synchronizer_builder.nil? + @ready_event.set + return + end + + # Start synchronizer loop in a separate thread + sync_thread = Thread.new { synchronizer_loop } + sync_thread.name = "FDv2-synchronizers" + @threads << sync_thread + end + + # + # Synchronizer loop that manages primary/secondary/fallback synchronizers. + # + # @return [void] + # + def synchronizer_loop + begin + while !@stop_event.set? && @primary_synchronizer_builder + # Try primary synchronizer + begin + @lock.synchronize do + primary_sync = @primary_synchronizer_builder.call(@sdk_key, @config) + if primary_sync.respond_to?(:set_diagnostic_accumulator) && @diagnostic_accumulator + primary_sync.set_diagnostic_accumulator(@diagnostic_accumulator) + end + @active_synchronizer = primary_sync + end + + @logger.info { "[LDClient] Primary synchronizer #{@active_synchronizer.name} is starting" } + + remove_sync, fallback_v1 = consume_synchronizer_results( + @active_synchronizer, + method(:fallback_condition) + ) + + if remove_sync + @primary_synchronizer_builder = fallback_v1 ? @fdv1_fallback_synchronizer_builder : @secondary_synchronizer_builder + @secondary_synchronizer_builder = nil + + if @primary_synchronizer_builder.nil? + @logger.warn { "[LDClient] No more synchronizers available" } + @data_source_status_provider.update_status( + LaunchDarkly::Interfaces::DataSource::Status::OFF, + @data_source_status_provider.status.last_error + ) + break + end + else + @logger.info { "[LDClient] Fallback condition met" } + end + + break if @stop_event.set? + + next if @secondary_synchronizer_builder.nil? + + @lock.synchronize do + secondary_sync = @secondary_synchronizer_builder.call(@sdk_key, @config) + if secondary_sync.respond_to?(:set_diagnostic_accumulator) && @diagnostic_accumulator + secondary_sync.set_diagnostic_accumulator(@diagnostic_accumulator) + end + @logger.info { "[LDClient] Secondary synchronizer #{secondary_sync.name} is starting" } + @active_synchronizer = secondary_sync + end + + remove_sync, fallback_v1 = consume_synchronizer_results( + @active_synchronizer, + method(:recovery_condition) + ) + + if remove_sync + @secondary_synchronizer_builder = nil + @primary_synchronizer_builder = @fdv1_fallback_synchronizer_builder if fallback_v1 + + if @primary_synchronizer_builder.nil? + @logger.warn { "[LDClient] No more synchronizers available" } + @data_source_status_provider.update_status( + LaunchDarkly::Interfaces::DataSource::Status::OFF, + @data_source_status_provider.status.last_error + ) + break + end + end + + @logger.info { "[LDClient] Recovery condition met, returning to primary synchronizer" } + rescue => e + @logger.error { "[LDClient] Failed to build synchronizer: #{e.message}" } + break + end + end + rescue => e + @logger.error { "[LDClient] Error in synchronizer loop: #{e.message}" } + ensure + # Ensure we always set the ready event when exiting + @ready_event.set + @lock.synchronize do + if @active_synchronizer + @active_synchronizer.stop + @active_synchronizer = nil + end + end + end + end + + # + # Consume results from a synchronizer until a condition is met or it fails. + # + # @param synchronizer [Object] The synchronizer + # @param condition_func [Proc] Function to check if condition is met + # @return [Array(Boolean, Boolean)] [should_remove_sync, fallback_to_fdv1] + # + def consume_synchronizer_results(synchronizer, condition_func) + action_queue = Queue.new + timer = LaunchDarkly::Impl::RepeatingTask.new(10, 10, -> { action_queue.push("check") }, @logger, "FDv2-sync-cond-timer") + + # Start reader thread + sync_reader = Thread.new do + begin + synchronizer.sync(@store) do |update| + action_queue.push(update) + end + ensure + action_queue.push("quit") + end + end + sync_reader.name = "FDv2-sync-reader" + + begin + timer.start + + loop do + update = action_queue.pop + + if update.is_a?(String) + break if update == "quit" + + if update == "check" + # Check condition periodically + current_status = @data_source_status_provider.status + return [false, false] if condition_func.call(current_status) + end + next + end + + @logger.info { "[LDClient] Synchronizer #{synchronizer.name} update: #{update.state}" } + return [false, false] if @stop_event.set? + + # Handle the update + @store.apply(update.change_set, true) if update.change_set + + # Set ready event on valid update + @ready_event.set if update.state == LaunchDarkly::Interfaces::DataSource::Status::VALID + + # Update status + @data_source_status_provider.update_status(update.state, update.error) + + # Check if we should revert to FDv1 immediately + return [true, true] if update.revert_to_fdv1 + + # Check for OFF state indicating permanent failure + return [true, false] if update.state == LaunchDarkly::Interfaces::DataSource::Status::OFF + end + rescue => e + @logger.error { "[LDClient] Error consuming synchronizer results: #{e.message}" } + return [true, false] + ensure + synchronizer.stop + timer.stop + sync_reader.join(0.5) if sync_reader.alive? + end + + [true, false] + end + + # + # Determine if we should fallback to secondary synchronizer. + # + # @param status [LaunchDarkly::Interfaces::DataSource::Status] Current data source status + # @return [Boolean] true if fallback condition is met + # + def fallback_condition(status) + interrupted_at_runtime = status.state == LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED && + Time.now - status.state_since > 60 # 1 minute + cannot_initialize = status.state == LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING && + Time.now - status.state_since > 10 # 10 seconds + + interrupted_at_runtime || cannot_initialize + end + + # + # Determine if we should try to recover to primary synchronizer. + # + # @param status [LaunchDarkly::Interfaces::DataSource::Status] Current data source status + # @return [Boolean] true if recovery condition is met + # + def recovery_condition(status) + interrupted_at_runtime = status.state == LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED && + Time.now - status.state_since > 60 # 1 minute + healthy_for_too_long = status.state == LaunchDarkly::Interfaces::DataSource::Status::VALID && + Time.now - status.state_since > 300 # 5 minutes + cannot_initialize = status.state == LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING && + Time.now - status.state_since > 10 # 10 seconds + + interrupted_at_runtime || healthy_for_too_long || cannot_initialize + end + + # + # Monitor the data store status. If the store comes online and + # potentially has stale data, we should write our known state to it. + # + # @param data_store_status [LaunchDarkly::Interfaces::DataStore::Status] The store status + # @return [void] + # + def persistent_store_outage_recovery(data_store_status) + return unless data_store_status.available + return unless data_store_status.stale + + err = @store.commit + @logger.error { "[LDClient] Failed to reinitialize data store: #{err.message}" } if err + end + end + end + end +end diff --git a/lib/ldclient-rb/impl/data_system/protocolv2.rb b/lib/ldclient-rb/impl/data_system/protocolv2.rb new file mode 100644 index 00000000..005448e5 --- /dev/null +++ b/lib/ldclient-rb/impl/data_system/protocolv2.rb @@ -0,0 +1,264 @@ +require 'json' + +module LaunchDarkly + module Impl + module DataSystem + module ProtocolV2 + # + # This module contains the protocol definitions and data types for the + # LaunchDarkly data system version 2 (FDv2). + # + + # + # DeleteObject specifies the deletion of a particular object. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + class DeleteObject + # @return [Integer] The version + attr_reader :version + + # @return [String] The object kind ({LaunchDarkly::Interfaces::DataSystem::ObjectKind}) + attr_reader :kind + + # @return [String] The key + attr_reader :key + + # + # @param version [Integer] The version + # @param kind [String] The object kind ({LaunchDarkly::Interfaces::DataSystem::ObjectKind}) + # @param key [String] The key + # + def initialize(version:, kind:, key:) + @version = version + @kind = kind + @key = key + end + + # + # Returns the event name. + # + # @return [String] + # + def name + LaunchDarkly::Interfaces::DataSystem::EventName::DELETE_OBJECT + end + + # + # Serializes the DeleteObject to a JSON-compatible hash. + # + # @return [Hash] + # + def to_h + { + version: @version, + kind: @kind, + key: @key, + } + end + + # + # Deserializes a DeleteObject from a JSON-compatible hash. + # + # @param data [Hash] The hash representation + # @return [DeleteObject] + # @raise [ArgumentError] if required fields are missing + # + def self.from_h(data) + version = data['version'] || data[:version] + kind = data['kind'] || data[:kind] + key = data['key'] || data[:key] + + raise ArgumentError, "Missing required fields in DeleteObject" if version.nil? || kind.nil? || key.nil? + + new(version: version, kind: kind, key: key) + end + end + + # + # PutObject specifies the addition of a particular object with upsert semantics. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + class PutObject + # @return [Integer] The version + attr_reader :version + + # @return [String] The object kind ({LaunchDarkly::Interfaces::DataSystem::ObjectKind}) + attr_reader :kind + + # @return [String] The key + attr_reader :key + + # @return [Hash] The object data + attr_reader :object + + # + # @param version [Integer] The version + # @param kind [String] The object kind ({LaunchDarkly::Interfaces::DataSystem::ObjectKind}) + # @param key [String] The key + # @param object [Hash] The object data + # + def initialize(version:, kind:, key:, object:) + @version = version + @kind = kind + @key = key + @object = object + end + + # + # Returns the event name. + # + # @return [String] + # + def name + LaunchDarkly::Interfaces::DataSystem::EventName::PUT_OBJECT + end + + # + # Serializes the PutObject to a JSON-compatible hash. + # + # @return [Hash] + # + def to_h + { + version: @version, + kind: @kind, + key: @key, + object: @object, + } + end + + # + # Deserializes a PutObject from a JSON-compatible hash. + # + # @param data [Hash] The hash representation + # @return [PutObject] + # @raise [ArgumentError] if required fields are missing + # + def self.from_h(data) + version = data['version'] || data[:version] + kind = data['kind'] || data[:kind] + key = data['key'] || data[:key] + object_data = data['object'] || data[:object] + + raise ArgumentError, "Missing required fields in PutObject" if version.nil? || kind.nil? || key.nil? || object_data.nil? + + new(version: version, kind: kind, key: key, object: object_data) + end + end + + # + # Goodbye represents a goodbye event. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + class Goodbye + # @return [String] The reason for goodbye + attr_reader :reason + + # @return [Boolean] Whether the goodbye is silent + attr_reader :silent + + # @return [Boolean] Whether this represents a catastrophic failure + attr_reader :catastrophe + + # + # @param reason [String] The reason for goodbye + # @param silent [Boolean] Whether the goodbye is silent + # @param catastrophe [Boolean] Whether this represents a catastrophic failure + # + def initialize(reason:, silent:, catastrophe:) + @reason = reason + @silent = silent + @catastrophe = catastrophe + end + + # + # Serializes the Goodbye to a JSON-compatible hash. + # + # @return [Hash] + # + def to_h + { + reason: @reason, + silent: @silent, + catastrophe: @catastrophe, + } + end + + # + # Deserializes a Goodbye event from a JSON-compatible hash. + # + # @param data [Hash] The hash representation + # @return [Goodbye] + # @raise [ArgumentError] if required fields are missing + # + def self.from_h(data) + reason = data['reason'] || data[:reason] + silent = data['silent'] || data[:silent] + catastrophe = data['catastrophe'] || data[:catastrophe] + + raise ArgumentError, "Missing required fields in Goodbye" if reason.nil? || silent.nil? || catastrophe.nil? + + new(reason: reason, silent: silent, catastrophe: catastrophe) + end + end + + # + # Error represents an error event. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + class Error + # @return [String] The payload ID + attr_reader :payload_id + + # @return [String] The reason for the error + attr_reader :reason + + # + # @param payload_id [String] The payload ID + # @param reason [String] The reason for the error + # + def initialize(payload_id:, reason:) + @payload_id = payload_id + @reason = reason + end + + # + # Serializes the Error to a JSON-compatible hash. + # + # @return [Hash] + # + def to_h + { + payloadId: @payload_id, + reason: @reason, + } + end + + # + # Deserializes an Error from a JSON-compatible hash. + # + # @param data [Hash] The hash representation + # @return [Error] + # @raise [ArgumentError] if required fields are missing + # + def self.from_h(data) + payload_id = data['payloadId'] || data[:payloadId] || data[:payload_id] + reason = data['reason'] || data[:reason] + + raise ArgumentError, "Missing required fields in Error" if payload_id.nil? || reason.nil? + + new(payload_id: payload_id, reason: reason) + end + end + end + end + end +end diff --git a/lib/ldclient-rb/interfaces/data_system.rb b/lib/ldclient-rb/interfaces/data_system.rb new file mode 100644 index 00000000..00fdcc23 --- /dev/null +++ b/lib/ldclient-rb/interfaces/data_system.rb @@ -0,0 +1,755 @@ +# frozen_string_literal: true + +module LaunchDarkly + module Interfaces + module DataSystem + # + # EventName represents the name of an event that can be sent by the server for FDv2. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + module EventName + # Specifies that an object should be added to the data set with upsert semantics. + PUT_OBJECT = "put-object" + + # Specifies that an object should be removed from the data set. + DELETE_OBJECT = "delete-object" + + # Specifies the server's intent. + SERVER_INTENT = "server-intent" + + # Specifies that all data required to bring the existing data set to a new version has been transferred. + PAYLOAD_TRANSFERRED = "payload-transferred" + + # Keeps the connection alive. + HEARTBEAT = "heart-beat" + + # Specifies that the server is about to close the connection. + GOODBYE = "goodbye" + + # Specifies that an error occurred while serving the connection. + ERROR = "error" + end + + # + # ObjectKind represents the kind of object. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + module ObjectKind + # Represents a feature flag. + FLAG = "flag" + + # Represents a segment. + SEGMENT = "segment" + end + + # + # ChangeType specifies if an object is being upserted or deleted. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + module ChangeType + # Represents an object being upserted. + PUT = "put" + + # Represents an object being deleted. + DELETE = "delete" + end + + # + # IntentCode represents the various intents that can be sent by the server. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + module IntentCode + # The server intends to send a full data set. + TRANSFER_FULL = "xfer-full" + + # The server intends to send only the necessary changes to bring an existing data set up-to-date. + TRANSFER_CHANGES = "xfer-changes" + + # The server intends to send no data (payload is up to date). + TRANSFER_NONE = "none" + end + + # + # DataStoreMode represents the mode of operation of a Data Store in FDV2 mode. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + module DataStoreMode + # Indicates that the data store is read-only. Data will never be written back to the store by the SDK. + READ_ONLY = :read_only + + # Indicates that the data store is read-write. Data from initializers/synchronizers may be written + # to the store as necessary. + READ_WRITE = :read_write + end + + # + # Selector represents a particular snapshot of data. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + class Selector + # @return [String] The state + attr_reader :state + + # @return [Integer] The version + attr_reader :version + + # + # @param state [String] The state + # @param version [Integer] The version + # + def initialize(state: "", version: 0) + @state = state + @version = version + end + + # + # Returns an empty Selector. + # + # @return [Selector] + # + def self.no_selector + Selector.new + end + + # + # Returns true if the Selector has a value. + # + # @return [Boolean] + # + def defined? + self != Selector.no_selector + end + + # + # Returns the event name for payload transfer. + # + # @return [String] + # + def name + EventName::PAYLOAD_TRANSFERRED + end + + # + # Creates a new Selector from a state string and version. + # + # @param state [String] The state + # @param version [Integer] The version + # @return [Selector] + # + def self.new_selector(state, version) + Selector.new(state: state, version: version) + end + + # + # Serializes the Selector to a Hash. + # + # @return [Hash] + # + def to_h + { + state: @state, + version: @version, + } + end + + # + # Deserializes a Selector from a Hash. + # + # @param data [Hash] The hash representation + # @return [Selector] + # @raise [ArgumentError] if required fields are missing + # + def self.from_h(data) + state = data['state'] || data[:state] + version = data['version'] || data[:version] + + raise ArgumentError, "Missing required fields in Selector" if state.nil? || version.nil? + + Selector.new(state: state, version: version) + end + + def ==(other) + other.is_a?(Selector) && @state == other.state && @version == other.version + end + + def eql?(other) + self == other + end + + def hash + [@state, @version].hash + end + end + + # + # Change represents a change to a piece of data, such as an update or deletion. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + class Change + # @return [String] The action ({ChangeType}) + attr_reader :action + + # @return [String] The kind ({ObjectKind}) + attr_reader :kind + + # @return [String] The key + attr_reader :key + + # @return [Integer] The version + attr_reader :version + + # @return [Hash, nil] The object data (for PUT actions) + attr_reader :object + + # + # @param action [String] The action type ({ChangeType}) + # @param kind [String] The object kind ({ObjectKind}) + # @param key [String] The key + # @param version [Integer] The version + # @param object [Hash, nil] The object data + # + def initialize(action:, kind:, key:, version:, object: nil) + @action = action + @kind = kind + @key = key + @version = version + @object = object + end + end + + # + # ChangeSet represents a list of changes to be applied. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + class ChangeSet + # @return [String] The intent code ({IntentCode}) + attr_reader :intent_code + + # @return [Array] The changes + attr_reader :changes + + # @return [Selector, nil] The selector + attr_reader :selector + + # + # @param intent_code [String] The intent code ({IntentCode}) + # @param changes [Array] The changes + # @param selector [Selector, nil] The selector + # + def initialize(intent_code:, changes:, selector:) + @intent_code = intent_code + @changes = changes + @selector = selector + end + end + + # + # Basis represents the initial payload of data that a data source can provide. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + class Basis + # @return [ChangeSet] The change set + attr_reader :change_set + + # @return [Boolean] Whether to persist + attr_reader :persist + + # @return [String, nil] The environment ID + attr_reader :environment_id + + # + # @param change_set [ChangeSet] The change set + # @param persist [Boolean] Whether to persist + # @param environment_id [String, nil] The environment ID + # + def initialize(change_set:, persist:, environment_id: nil) + @change_set = change_set + @persist = persist + @environment_id = environment_id + end + end + + # + # Payload represents a payload delivered in a streaming response. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + class Payload + # @return [String] The payload ID + attr_reader :id + + # @return [Integer] The target + attr_reader :target + + # @return [String] The intent code ({IntentCode}) + attr_reader :code + + # @return [String] The reason + attr_reader :reason + + # + # @param id [String] The payload ID + # @param target [Integer] The target + # @param code [String] The intent code ({IntentCode}) + # @param reason [String] The reason + # + def initialize(id:, target:, code:, reason:) + @id = id + @target = target + @code = code + @reason = reason + end + + # + # Serializes the Payload to a Hash. + # + # @return [Hash] + # + def to_h + { + id: @id, + target: @target, + intentCode: @code, + reason: @reason, + } + end + + # + # Deserializes a Payload from a Hash. + # + # @param data [Hash] The hash representation + # @return [Payload] + # @raise [ArgumentError] if required fields are missing or invalid + # + def self.from_h(data) + intent_code = data['intentCode'] || data[:intentCode] + + raise ArgumentError, "Invalid data for Payload: 'intentCode' key is missing or not a string" if intent_code.nil? || !intent_code.is_a?(String) + + Payload.new( + id: data['id'] || data[:id] || "", + target: data['target'] || data[:target] || 0, + code: intent_code, + reason: data['reason'] || data[:reason] || "" + ) + end + end + + # + # ServerIntent represents the type of change associated with the payload. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + class ServerIntent + # @return [Payload] The payload + attr_reader :payload + + # + # @param payload [Payload] The payload + # + def initialize(payload:) + @payload = payload + end + + # + # Serializes the ServerIntent to a Hash. + # + # @return [Hash] + # + def to_h + { + payloads: [@payload.to_h], + } + end + + # + # Deserializes a ServerIntent from a Hash. + # + # @param data [Hash] The hash representation + # @return [ServerIntent] + # @raise [ArgumentError] if required fields are missing or invalid + # + def self.from_h(data) + payloads = data['payloads'] || data[:payloads] + + raise ArgumentError, "Invalid data for ServerIntent: 'payloads' key is missing or not an array" unless payloads.is_a?(Array) + raise ArgumentError, "Invalid data for ServerIntent: expected exactly one payload" unless payloads.length == 1 + + payload = payloads[0] + raise ArgumentError, "Invalid payload in ServerIntent: expected a hash" unless payload.is_a?(Hash) + + ServerIntent.new(payload: Payload.from_h(payload)) + end + end + + # + # ChangeSetBuilder is a helper for constructing a ChangeSet. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + class ChangeSetBuilder + # @return [String, nil] The current intent ({IntentCode}) + attr_accessor :intent + + # @return [Array] The changes + attr_accessor :changes + + def initialize + @intent = nil + @changes = [] + end + + # + # Represents an intent that the current data is up-to-date and doesn't require changes. + # + # @return [ChangeSet] + # + def self.no_changes + ChangeSet.new( + intent_code: IntentCode::TRANSFER_NONE, + selector: nil, + changes: [] + ) + end + + # + # Returns an empty ChangeSet, useful for initializing without data. + # + # @param selector [Selector] The selector + # @return [ChangeSet] + # + def self.empty(selector) + ChangeSet.new( + intent_code: IntentCode::TRANSFER_FULL, + selector: selector, + changes: [] + ) + end + + # + # Begins a new change set with a given intent. + # + # @param intent [String] The intent code ({IntentCode}) + # @return [void] + # + def start(intent) + @intent = intent + @changes = [] + end + + # + # Ensures that the current ChangeSetBuilder is prepared to handle changes. + # + # @return [void] + # @raise [RuntimeError] if no server-intent has been set + # + def expect_changes + raise "changeset: cannot expect changes without a server-intent" if @intent.nil? + + return unless @intent == IntentCode::TRANSFER_NONE + + @intent = IntentCode::TRANSFER_CHANGES + end + + # + # Clears any existing changes while preserving the current intent. + # + # @return [void] + # + def reset + @changes = [] + end + + # + # Identifies a changeset with a selector and returns the completed changeset. + # + # @param selector [Selector] The selector + # @return [ChangeSet] + # @raise [RuntimeError] if no server-intent has been set + # + def finish(selector) + raise "changeset: cannot complete without a server-intent" if @intent.nil? + + changeset = ChangeSet.new( + intent_code: @intent, + selector: selector, + changes: @changes + ) + @changes = [] + + # Once a full transfer has been processed, all future changes should be + # assumed to be changes. Flag delivery can override this behavior by + # sending a new server intent to any connected stream. + @intent = IntentCode::TRANSFER_CHANGES if @intent == IntentCode::TRANSFER_FULL + + changeset + end + + # + # Adds a new object to the changeset. + # + # @param kind [String] The object kind ({ObjectKind}) + # @param key [String] The key + # @param version [Integer] The version + # @param obj [Hash] The object data + # @return [void] + # + def add_put(kind, key, version, obj) + @changes << Change.new( + action: ChangeType::PUT, + kind: kind, + key: key, + version: version, + object: obj + ) + end + + # + # Adds a deletion to the changeset. + # + # @param kind [String] The object kind ({ObjectKind}) + # @param key [String] The key + # @param version [Integer] The version + # @return [void] + # + def add_delete(kind, key, version) + @changes << Change.new( + action: ChangeType::DELETE, + kind: kind, + key: key, + version: version + ) + end + end + + # + # Update represents the results of a synchronizer's ongoing sync method. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + class Update + # @return [Symbol] The data source state ({LaunchDarkly::Interfaces::DataSource::Status}) + attr_reader :state + + # @return [ChangeSet, nil] The change set + attr_reader :change_set + + # @return [LaunchDarkly::Interfaces::DataSource::ErrorInfo, nil] Error information + attr_reader :error + + # @return [Boolean] Whether to revert to FDv1 + attr_reader :revert_to_fdv1 + + # @return [String, nil] The environment ID + attr_reader :environment_id + + # + # @param state [Symbol] The data source state ({LaunchDarkly::Interfaces::DataSource::Status}) + # @param change_set [ChangeSet, nil] The change set + # @param error [LaunchDarkly::Interfaces::DataSource::ErrorInfo, nil] Error information + # @param revert_to_fdv1 [Boolean] Whether to revert to FDv1 + # @param environment_id [String, nil] The environment ID + # + def initialize(state:, change_set: nil, error: nil, revert_to_fdv1: false, environment_id: nil) + @state = state + @change_set = change_set + @error = error + @revert_to_fdv1 = revert_to_fdv1 + @environment_id = environment_id + end + end + + # + # SelectorStore represents a component capable of providing Selectors for data retrieval. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + module SelectorStore + # + # Returns a Selector object that defines the criteria for data retrieval. + # + # @return [Selector] + # + def selector + raise NotImplementedError, "#{self.class} must implement #selector" + end + end + + # + # ReadOnlyStore represents a read-only store interface for retrieving data. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + module ReadOnlyStore + # + # Retrieves an item by kind and key. + # + # @param kind [LaunchDarkly::Impl::DataStore::DataKind] The data kind (e.g., LaunchDarkly::Impl::DataStore::FEATURES, LaunchDarkly::Impl::DataStore::SEGMENTS) + # @param key [String] The item key + # @return [Hash, nil] The item, or nil if not found or deleted + # + def get(kind, key) + raise NotImplementedError, "#{self.class} must implement #get" + end + + # + # Retrieves all items of a given kind. + # + # @param kind [LaunchDarkly::Impl::DataStore::DataKind] The data kind (e.g., LaunchDarkly::Impl::DataStore::FEATURES, LaunchDarkly::Impl::DataStore::SEGMENTS) + # @return [Hash] Hash of keys to items (excluding deleted items) + # + def all(kind) + raise NotImplementedError, "#{self.class} must implement #all" + end + + # + # Returns whether the store has been initialized. + # + # @return [Boolean] + # + def initialized? + raise NotImplementedError, "#{self.class} must implement #initialized?" + end + end + + # + # Initializer represents a component capable of retrieving a single data result. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + module Initializer + # + # Returns the name of the initializer. + # + # @return [String] + # + def name + raise NotImplementedError, "#{self.class} must implement #name" + end + + # + # Retrieves the initial data set for the data source. + # + # @param selector_store [SelectorStore] Provides the Selector + # @return [LaunchDarkly::Result] + # + def fetch(selector_store) + raise NotImplementedError, "#{self.class} must implement #fetch" + end + end + + # + # Synchronizer represents a component capable of synchronizing data from an external source. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + module Synchronizer + # + # Returns the name of the synchronizer. + # + # @return [String] + # + def name + raise NotImplementedError, "#{self.class} must implement #name" + end + + # + # Begins the synchronization process, yielding Update objects. + # + # @param selector_store [SelectorStore] Provides the Selector + # @yieldparam update [Update] The update + # @return [void] + # + def sync(selector_store, &block) + raise NotImplementedError, "#{self.class} must implement #sync" + end + + # + # Halts the synchronization process. + # + # @return [void] + # + def stop + raise NotImplementedError, "#{self.class} must implement #stop" + end + end + end + end +end From 929995dd77ca76a03a3251674022aa16ee7e16dd Mon Sep 17 00:00:00 2001 From: Jason Bailey Date: Wed, 24 Dec 2025 10:29:18 -0600 Subject: [PATCH 2/9] Apply suggestion from @keelerm84 Co-authored-by: Matthew M. Keeler --- lib/ldclient-rb/datasystem.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ldclient-rb/datasystem.rb b/lib/ldclient-rb/datasystem.rb index fef3025f..f175bd4b 100644 --- a/lib/ldclient-rb/datasystem.rb +++ b/lib/ldclient-rb/datasystem.rb @@ -205,7 +205,7 @@ def self.custom # @return [ConfigBuilder] # def self.daemon(store) - default.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY) + custom.data_store(store, LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY) end # From bb46c608be5a83396a0598697442f52e0008b76c Mon Sep 17 00:00:00 2001 From: Jason Bailey Date: Wed, 24 Dec 2025 14:56:32 -0600 Subject: [PATCH 3/9] Apply suggestion from @keelerm84 Co-authored-by: Matthew M. Keeler --- lib/ldclient-rb/impl/data_system/fdv2.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/ldclient-rb/impl/data_system/fdv2.rb b/lib/ldclient-rb/impl/data_system/fdv2.rb index 57e91fb0..1d8105a5 100644 --- a/lib/ldclient-rb/impl/data_system/fdv2.rb +++ b/lib/ldclient-rb/impl/data_system/fdv2.rb @@ -91,11 +91,8 @@ def initialize(sdk_key, config, data_system_config) # (see DataSystem#start) def start - return @ready_event if @disabled - - @logger.warn { "[LDClient] Data system is disabled, SDK will return application-defined default values" } if @disabled - if @disabled + @logger.warn { "[LDClient] Data system is disabled, SDK will return application-defined default values" } @ready_event.set return @ready_event end From 4794483c0476d8cf85fab0b4266ba229fc006d0b Mon Sep 17 00:00:00 2001 From: Jason Bailey Date: Wed, 24 Dec 2025 14:57:21 -0600 Subject: [PATCH 4/9] Apply suggestion from @keelerm84 Co-authored-by: Matthew M. Keeler --- lib/ldclient-rb/impl/data_system/fdv2.rb | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/ldclient-rb/impl/data_system/fdv2.rb b/lib/ldclient-rb/impl/data_system/fdv2.rb index 1d8105a5..4c78b6a6 100644 --- a/lib/ldclient-rb/impl/data_system/fdv2.rb +++ b/lib/ldclient-rb/impl/data_system/fdv2.rb @@ -343,10 +343,8 @@ def synchronizer_loop # Ensure we always set the ready event when exiting @ready_event.set @lock.synchronize do - if @active_synchronizer - @active_synchronizer.stop - @active_synchronizer = nil - end +@active_synchronizer&.stop +@active_synchronizer = nil end end end From 513fafefed4da89f1a775115fdd2dfc1adf7b81f Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 24 Dec 2025 21:17:31 +0000 Subject: [PATCH 5/9] address feedback on pr --- lib/ldclient-rb/config.rb | 10 ++--- .../{datasystem.rb => data_system.rb} | 0 .../feature_store_client_wrapper.rb | 42 +++++++++---------- .../data_store/in_memory_feature_store.rb | 8 ++-- .../impl/data_store/status_provider.rb | 10 +++++ lib/ldclient-rb/impl/data_store/store.rb | 10 ++--- lib/ldclient-rb/impl/model/serialization.rb | 2 +- 7 files changed, 46 insertions(+), 36 deletions(-) rename lib/ldclient-rb/{datasystem.rb => data_system.rb} (100%) diff --git a/lib/ldclient-rb/config.rb b/lib/ldclient-rb/config.rb index 80e59547..6e5b71f3 100644 --- a/lib/ldclient-rb/config.rb +++ b/lib/ldclient-rb/config.rb @@ -699,15 +699,15 @@ def initialize(store:, context_cache_size: nil, context_cache_time: nil, status_ # class DataSystemConfig # - # @param initializers [Array LaunchDarkly::Interfaces::DataSystem::Initializer>, nil] Array of builder procs that take Config and return an Initializer - # @param primary_synchronizer [Proc(Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] Builder proc that takes Config and returns the primary Synchronizer - # @param secondary_synchronizer [Proc(Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] Builder proc that takes Config and returns the secondary Synchronizer - # @param data_store_mode [Symbol] The data store mode + # @param initializers [Array LaunchDarkly::Interfaces::DataSystem::Initializer>, nil] The (optional) array of builder procs + # @param primary_synchronizer [Proc(Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] The (optional) builder proc for primary synchronizer + # @param secondary_synchronizer [Proc(Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] The (optional) builder proc for secondary synchronizer + # @param data_store_mode [Symbol] The (optional) data store mode # @param data_store [LaunchDarkly::Interfaces::FeatureStore, nil] The (optional) data store # @param fdv1_fallback_synchronizer [Proc(Config) => LaunchDarkly::Interfaces::DataSystem::Synchronizer, nil] # The (optional) builder proc for FDv1-compatible fallback synchronizer # - def initialize(initializers:, primary_synchronizer:, secondary_synchronizer:, + def initialize(initializers: nil, primary_synchronizer: nil, secondary_synchronizer: nil, data_store_mode: LaunchDarkly::Interfaces::DataStoreMode::READ_ONLY, data_store: nil, fdv1_fallback_synchronizer: nil) @initializers = initializers @primary_synchronizer = primary_synchronizer diff --git a/lib/ldclient-rb/datasystem.rb b/lib/ldclient-rb/data_system.rb similarity index 100% rename from lib/ldclient-rb/datasystem.rb rename to lib/ldclient-rb/data_system.rb diff --git a/lib/ldclient-rb/impl/data_store/feature_store_client_wrapper.rb b/lib/ldclient-rb/impl/data_store/feature_store_client_wrapper.rb index ff1cdeb4..cbee80c2 100644 --- a/lib/ldclient-rb/impl/data_store/feature_store_client_wrapper.rb +++ b/lib/ldclient-rb/impl/data_store/feature_store_client_wrapper.rb @@ -26,7 +26,7 @@ def initialize(store, store_update_sink, logger) @store = store @store_update_sink = store_update_sink @logger = logger - @monitoring_enabled = does_store_support_monitoring? + @monitoring_enabled = store_supports_monitoring? # Thread synchronization @mutex = Mutex.new @@ -95,37 +95,37 @@ def monitoring_enabled? # @return [void] # private def update_availability(available) + state_changed = false + poller_to_stop = nil + @mutex.synchronize do return if available == @last_available + + state_changed = true @last_available = available + + if available + poller_to_stop = @poller + @poller = nil + elsif @poller.nil? + task = LaunchDarkly::Impl::RepeatingTask.new(0.5, 0, method(:check_availability), @logger, "LDClient/DataStoreWrapperV2#check-availability") + @poller = task + @poller.start + end end + return unless state_changed + if available @logger.warn { "[LDClient] Persistent store is available again" } + else + @logger.warn { "[LDClient] Detected persistent store unavailability; updates will be cached until it recovers" } end status = LaunchDarkly::Interfaces::DataStore::Status.new(available, true) @store_update_sink.update_status(status) - if available - @mutex.synchronize do - return if @poller.nil? - - @poller.stop - @poller = nil - end - - return - end - - @logger.warn { "[LDClient] Detected persistent store unavailability; updates will be cached until it recovers" } - - task = LaunchDarkly::Impl::RepeatingTask.new(0.5, 0, method(:check_availability), @logger, "LDClient/DataStoreWrapperV2#check-availability") - - @mutex.synchronize do - @poller = task - @poller.start - end + poller_to_stop.stop if poller_to_stop end # @@ -156,7 +156,7 @@ def monitoring_enabled? # # @return [Boolean] # - private def does_store_support_monitoring? + private def store_supports_monitoring? return false unless @store.respond_to?(:monitoring_enabled?) return false unless @store.respond_to?(:available?) diff --git a/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb b/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb index 5512f8fa..d9e7fc85 100644 --- a/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb +++ b/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb @@ -13,7 +13,8 @@ module DataStore # class InMemoryFeatureStoreV2 include LaunchDarkly::Interfaces::DataSystem::ReadOnlyStore - def initialize + def initialize(logger) + @logger = logger @lock = Concurrent::ReadWriteLock.new @initialized = Concurrent::AtomicBoolean.new(false) @items = {} @@ -97,7 +98,7 @@ def apply_delta(collections) true rescue => e - LaunchDarkly::Impl.log.error { "[LDClient] Failed applying apply_delta: #{e.message}" } + @logger.error { "[LDClient] Failed applying apply_delta: #{e.message}" } false end @@ -113,8 +114,7 @@ def apply_delta(collections) collections.each do |kind, collection| items_decoded = {} collection.each do |key, item| - # Items are already in decoded format for FDv2 - items_decoded[key.to_sym] = item + items_decoded[key] = LaunchDarkly::Impl::Model.deserialize(kind, item, @logger) end all_decoded[kind] = items_decoded end diff --git a/lib/ldclient-rb/impl/data_store/status_provider.rb b/lib/ldclient-rb/impl/data_store/status_provider.rb index 506b26e3..2d63370f 100644 --- a/lib/ldclient-rb/impl/data_store/status_provider.rb +++ b/lib/ldclient-rb/impl/data_store/status_provider.rb @@ -29,6 +29,7 @@ def initialize(store, listeners) @listeners = listeners @lock = Concurrent::ReadWriteLock.new @status = LaunchDarkly::Interfaces::DataStore::Status.new(true, false) + @monitoring_enabled = store_supports_monitoring? end # (see LaunchDarkly::Interfaces::DataStore::UpdateSink#update_status) @@ -54,6 +55,15 @@ def status # (see LaunchDarkly::Interfaces::DataStore::StatusProvider#monitoring_enabled?) def monitoring_enabled? + @monitoring_enabled + end + + # + # Determines whether the store supports monitoring. + # + # @return [Boolean] + # + private def store_supports_monitoring? return false if @store.nil? return false unless @store.respond_to?(:monitoring_enabled?) diff --git a/lib/ldclient-rb/impl/data_store/store.rb b/lib/ldclient-rb/impl/data_store/store.rb index 82b2e4dd..a4887cc1 100644 --- a/lib/ldclient-rb/impl/data_store/store.rb +++ b/lib/ldclient-rb/impl/data_store/store.rb @@ -36,7 +36,7 @@ def initialize(flag_change_broadcaster, change_set_broadcaster, logger) @persistent_store_writable = false # Source of truth for flag evaluations once initialized - @memory_store = InMemoryFeatureStoreV2.new + @memory_store = InMemoryFeatureStoreV2.new(logger) # Used to track dependencies between items in the store @dependency_tracker = LaunchDarkly::Impl::DependencyTracker.new @@ -284,7 +284,7 @@ def get_data_store_status_provider # Convert a list of Changes to the pre-existing format used by FeatureStore. # # @param changes [Array] List of changes - # @return [Hash{Object => Hash{String => Hash}}] Hash suitable for FeatureStore operations + # @return [Hash{DataKind => Hash{String => Hash}}] Hash suitable for FeatureStore operations # private def changes_to_store_data(changes) all_data = { @@ -307,7 +307,7 @@ def get_data_store_status_provider # # Reset dependency tracker with new full data set. # - # @param all_data [Hash{Object => Hash{String => Hash}}] Hash of data kinds to items + # @param all_data [Hash{DataKind => Hash{String => Hash}}] Hash of data kinds to items # @return [void] # private def reset_dependency_tracker(all_data) @@ -336,8 +336,8 @@ def get_data_store_status_provider # # Compute which items changed between old and new data sets. # - # @param old_data [Hash{Object => Hash{String => Hash}}] Old data hash - # @param new_data [Hash{Object => Hash{String => Hash}}] New data hash + # @param old_data [Hash{DataKind => Hash{String => Hash}}] Old data hash + # @param new_data [Hash{DataKind => Hash{String => Hash}}] New data hash # @return [Set] Set of {kind:, key:} hashes # private def compute_changed_items_for_full_data_set(old_data, new_data) diff --git a/lib/ldclient-rb/impl/model/serialization.rb b/lib/ldclient-rb/impl/model/serialization.rb index ef3a3389..978111d9 100644 --- a/lib/ldclient-rb/impl/model/serialization.rb +++ b/lib/ldclient-rb/impl/model/serialization.rb @@ -41,7 +41,7 @@ module Model # @return [Object] the flag or segment (or, for an unknown data kind, the data as a hash) def self.deserialize(kind, input, logger = nil) return nil if input.nil? - return input if !input.is_a?(String) && !input.is_a?(Hash) + return input unless input.is_a?(String) || input.is_a?(Hash) data = input.is_a?(Hash) ? input : JSON.parse(input, symbolize_names: true) case kind when Impl::DataStore::FEATURES From b43c904223b71ba629f9cc739eb247a7b62c449b Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 24 Dec 2025 21:58:04 +0000 Subject: [PATCH 6/9] ensure consistency for keys being strings --- lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb b/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb index d9e7fc85..53675bf4 100644 --- a/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb +++ b/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb @@ -28,7 +28,7 @@ def get(kind, key) items_of_kind = @items[kind] return nil if items_of_kind.nil? - item = items_of_kind[key.to_sym] + item = items_of_kind[key] return nil if item.nil? return nil if item[:deleted] @@ -91,7 +91,7 @@ def apply_delta(collections) all_decoded.each do |kind, kind_data| items_of_kind = @items[kind] ||= {} kind_data.each do |key, item| - items_of_kind[key.to_sym] = item + items_of_kind[key] = item end end end From 467c120a7d40539007b794535ab05938d453f864 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 24 Dec 2025 22:01:01 +0000 Subject: [PATCH 7/9] fix invalid log reference --- lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb b/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb index 53675bf4..d9d65523 100644 --- a/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb +++ b/lib/ldclient-rb/impl/data_store/in_memory_feature_store.rb @@ -73,7 +73,7 @@ def set_basis(collections) true rescue => e - LaunchDarkly::Impl.log.error { "[LDClient] Failed applying set_basis: #{e.message}" } + @logger.error { "[LDClient] Failed applying set_basis: #{e.message}" } false end @@ -121,7 +121,7 @@ def apply_delta(collections) all_decoded rescue => e - LaunchDarkly::Impl.log.error { "[LDClient] Failed decoding collection: #{e.message}" } + @logger.error { "[LDClient] Failed decoding collection: #{e.message}" } nil end end From ff982dd59ecb1900fd398ecfb4d92095c8ea8136 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 24 Dec 2025 22:08:33 +0000 Subject: [PATCH 8/9] fix method names and simplify code --- .../impl/data_store/status_provider.rb | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/lib/ldclient-rb/impl/data_store/status_provider.rb b/lib/ldclient-rb/impl/data_store/status_provider.rb index 2d63370f..fda0bf08 100644 --- a/lib/ldclient-rb/impl/data_store/status_provider.rb +++ b/lib/ldclient-rb/impl/data_store/status_provider.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "concurrent" +require "forwardable" require "ldclient-rb/interfaces" module LaunchDarkly @@ -18,15 +19,18 @@ module DataStore class StatusProviderV2 include LaunchDarkly::Interfaces::DataStore::StatusProvider + extend Forwardable + def_delegators :@status_broadcaster, :add_listener, :remove_listener + # # Initialize the status provider. # # @param store [Object, nil] The feature store (may be nil for in-memory only) - # @param listeners [LaunchDarkly::Impl::Broadcaster] Broadcaster for status changes + # @param status_broadcaster [LaunchDarkly::Impl::Broadcaster] Broadcaster for status changes # - def initialize(store, listeners) + def initialize(store, status_broadcaster) @store = store - @listeners = listeners + @status_broadcaster = status_broadcaster @lock = Concurrent::ReadWriteLock.new @status = LaunchDarkly::Interfaces::DataStore::Status.new(true, false) @monitoring_enabled = store_supports_monitoring? @@ -43,7 +47,7 @@ def update_status(status) end end - @listeners.broadcast(status) if modified + @status_broadcaster.broadcast(status) if modified end # (see LaunchDarkly::Interfaces::DataStore::StatusProvider#status) @@ -69,16 +73,6 @@ def monitoring_enabled? @store.monitoring_enabled? end - - # (see LaunchDarkly::Interfaces::DataStore::StatusProvider#add_listener) - def add_listener(listener) - @listeners.add(listener) - end - - # (see LaunchDarkly::Interfaces::DataStore::StatusProvider#remove_listener) - def remove_listener(listener) - @listeners.remove(listener) - end end end end From 0250999a009b0afc26c70d5198eeca570f914317 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 24 Dec 2025 22:38:03 +0000 Subject: [PATCH 9/9] We should expect symbols, simplify the code and remove error with boolean value parsing --- .../impl/data_system/protocolv2.rb | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/ldclient-rb/impl/data_system/protocolv2.rb b/lib/ldclient-rb/impl/data_system/protocolv2.rb index 005448e5..faacfec6 100644 --- a/lib/ldclient-rb/impl/data_system/protocolv2.rb +++ b/lib/ldclient-rb/impl/data_system/protocolv2.rb @@ -66,9 +66,9 @@ def to_h # @raise [ArgumentError] if required fields are missing # def self.from_h(data) - version = data['version'] || data[:version] - kind = data['kind'] || data[:kind] - key = data['key'] || data[:key] + version = data[:version] + kind = data[:kind] + key = data[:key] raise ArgumentError, "Missing required fields in DeleteObject" if version.nil? || kind.nil? || key.nil? @@ -139,10 +139,10 @@ def to_h # @raise [ArgumentError] if required fields are missing # def self.from_h(data) - version = data['version'] || data[:version] - kind = data['kind'] || data[:kind] - key = data['key'] || data[:key] - object_data = data['object'] || data[:object] + version = data[:version] + kind = data[:kind] + key = data[:key] + object_data = data[:object] raise ArgumentError, "Missing required fields in PutObject" if version.nil? || kind.nil? || key.nil? || object_data.nil? @@ -198,9 +198,9 @@ def to_h # @raise [ArgumentError] if required fields are missing # def self.from_h(data) - reason = data['reason'] || data[:reason] - silent = data['silent'] || data[:silent] - catastrophe = data['catastrophe'] || data[:catastrophe] + reason = data[:reason] + silent = data[:silent] + catastrophe = data[:catastrophe] raise ArgumentError, "Missing required fields in Goodbye" if reason.nil? || silent.nil? || catastrophe.nil? @@ -250,8 +250,8 @@ def to_h # @raise [ArgumentError] if required fields are missing # def self.from_h(data) - payload_id = data['payloadId'] || data[:payloadId] || data[:payload_id] - reason = data['reason'] || data[:reason] + payload_id = data[:payloadId] + reason = data[:reason] raise ArgumentError, "Missing required fields in Error" if payload_id.nil? || reason.nil?