diff --git a/lib/ldclient-rb/events.rb b/lib/ldclient-rb/events.rb index fa233241..3447a55e 100644 --- a/lib/ldclient-rb/events.rb +++ b/lib/ldclient-rb/events.rb @@ -62,6 +62,19 @@ def record_custom_event( def record_migration_op_event(event) end + # + # Tells the event processor that all pending analytics events should be delivered as soon as possible. + # + # When the LaunchDarkly client generates analytics events (from {LaunchDarkly::LDClient#variation}, + # {LaunchDarkly::LDClient#variation_detail}, {LaunchDarkly::LDClient#identify}, or + # {LaunchDarkly::LDClient#track}), they are queued on a worker thread. The event thread normally + # sends all queued events to LaunchDarkly at regular intervals, controlled by the + # {LaunchDarkly::Config#flush_interval} option. Calling `flush` triggers a send without waiting + # for the next interval. + # + # Flushing is asynchronous, so this method will return before it is complete. However, if you + # call {LaunchDarkly::LDClient#close}, events are guaranteed to be sent before that method returns. + # def flush end diff --git a/lib/ldclient-rb/impl/data_system/fdv1.rb b/lib/ldclient-rb/impl/data_system/fdv1.rb index a2fcdd13..1eb3fa25 100644 --- a/lib/ldclient-rb/impl/data_system/fdv1.rb +++ b/lib/ldclient-rb/impl/data_system/fdv1.rb @@ -38,13 +38,22 @@ def initialize(sdk_key, config) @data_store_broadcaster ) - # Wrap the data store with client wrapper (must be created before status provider) + # Preserve the original unwrapped store to avoid nested wrappers on postfork + original_store = @config.feature_store + if original_store.is_a?(LaunchDarkly::Impl::FeatureStoreClientWrapper) + original_store = original_store.instance_variable_get(:@store) + end + + # Wrap the original data store with client wrapper (must be created before status provider) @store_wrapper = LaunchDarkly::Impl::FeatureStoreClientWrapper.new( - @config.feature_store, + original_store, @data_store_update_sink, @config.logger ) + # Update config to use wrapped store so data sources can access it + @config.instance_variable_set(:@feature_store, @store_wrapper) + # Create status provider with store wrapper @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new( @store_wrapper, @@ -83,6 +92,7 @@ def start # (see DataSystem#stop) def stop @update_processor&.stop + @store_wrapper.stop @shared_executor.shutdown end @@ -156,6 +166,8 @@ def target_availability end # Polling processor + @config.logger.info { "Disabling streaming API" } + @config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" } requestor = LaunchDarkly::Impl::DataSource::Requestor.new(@sdk_key, @config) LaunchDarkly::Impl::DataSource::PollingProcessor.new(@config, requestor) end diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index 825813c4..cb4662ad 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -2,20 +2,15 @@ require "ldclient-rb/impl/broadcaster" require "ldclient-rb/impl/context" require "ldclient-rb/impl/data_source" -require "ldclient-rb/impl/data_source/null_processor" -require "ldclient-rb/impl/data_source/polling" -require "ldclient-rb/impl/data_source/requestor" -require "ldclient-rb/impl/data_source/stream" require "ldclient-rb/impl/data_store" +require "ldclient-rb/impl/data_system/fdv1" require "ldclient-rb/impl/diagnostic_events" require "ldclient-rb/impl/evaluation_with_hook_result" require "ldclient-rb/impl/evaluator" require "ldclient-rb/impl/flag_tracker" require "ldclient-rb/impl/migrations/tracker" -require "ldclient-rb/impl/store_client_wrapper" require "ldclient-rb/impl/util" require "ldclient-rb/events" -require "ldclient-rb/in_memory_store" require "concurrent" require "concurrent/atomics" require "digest/sha1" @@ -37,6 +32,18 @@ class LDClient def_delegators :@config, :logger + # @!method flush + # Delegates to {LaunchDarkly::EventProcessorMethods#flush}. + def_delegator :@event_processor, :flush + + # @!method data_store_status_provider + # Delegates to the data system {LaunchDarkly::Impl::DataSystem#data_store_status_provider}. + # @return [LaunchDarkly::Interfaces::DataStore::StatusProvider] + # @!method data_source_status_provider + # Delegates to the data system {LaunchDarkly::Impl::DataSystem#data_source_status_provider}. + # @return [LaunchDarkly::Interfaces::DataSource::StatusProvider] + def_delegators :@data_system, :data_store_status_provider, :data_source_status_provider + # # Creates a new client instance that connects to LaunchDarkly. A custom # configuration parameter can also supplied to specify advanced options, @@ -55,13 +62,16 @@ class LDClient # def initialize(sdk_key, config = Config.default, wait_for_sec = 5) # Note that sdk_key is normally a required parameter, and a nil value would cause the SDK to - # fail in most configurations. However, there are some configurations where it would be OK - # (offline = true, *or* we are using LDD mode or the file data source and events are disabled - # so we're not connecting to any LD services) so rather than try to check for all of those - # up front, we will let the constructors for the data source implementations implement this - # fail-fast as appropriate, and just check here for the part regarding events. - if !config.offline? && config.send_events - raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? + # fail in most configurations. However, there are some configurations where it would be OK to + # not provide a SDK key. + # * Offline mode + # * Using LDD mode with events disabled + # * Using a custom data source (like FileData) with events disabled + if !config.offline? && sdk_key.nil? + # If the data source is nil we create a default data source which requires the SDK key. + if config.send_events || (!config.use_ldd? && config.data_source.nil?) + raise ArgumentError, "sdk_key must not be nil" + end end @sdk_key = sdk_key @@ -89,9 +99,10 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) # @param wait_for_sec [Float] maximum time (in seconds) to wait for initialization # def postfork(wait_for_sec = 5) - @data_source = nil + @data_system = nil @event_processor = nil @big_segment_store_manager = nil + @flag_tracker = nil start_up(wait_for_sec) end @@ -102,32 +113,22 @@ def postfork(wait_for_sec = 5) @hooks = Concurrent::Array.new(@config.hooks + plugin_hooks) - @shared_executor = Concurrent::SingleThreadExecutor.new - - data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) - store_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(data_store_broadcaster) - - # We need to wrap the feature store object with a FeatureStoreClientWrapper in order to add - # some necessary logic around updates. Unfortunately, we have code elsewhere that accesses - # the feature store through the Config object, so we need to make a new Config that uses - # the wrapped store. - @store = Impl::FeatureStoreClientWrapper.new(@config.feature_store, store_sink, @config.logger) - updated_config = @config.clone - updated_config.instance_variable_set(:@feature_store, @store) - @config = updated_config - - @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(@store, store_sink) + # Initialize the data system (FDv1 for now, will support FDv2 in the future) + # Note: FDv1 will update @config.feature_store to use its wrapped store + @data_system = Impl::DataSystem::FDv1.new(@sdk_key, @config) + # Components not managed by data system @big_segment_store_manager = Impl::BigSegmentStoreManager.new(@config.big_segments, @config.logger) @big_segment_store_status_provider = @big_segment_store_manager.status_provider - get_flag = lambda { |key| @store.get(Impl::DataStore::FEATURES, key) } - get_segment = lambda { |key| @store.get(Impl::DataStore::SEGMENTS, key) } + get_flag = lambda { |key| @data_system.store.get(Impl::DataStore::FEATURES, key) } + get_segment = lambda { |key| @data_system.store.get(Impl::DataStore::SEGMENTS, key) } get_big_segments_membership = lambda { |key| @big_segment_store_manager.get_context_membership(key) } @evaluator = LaunchDarkly::Impl::Evaluator.new(get_flag, get_segment, get_big_segments_membership, @config.logger) if !@config.offline? && @config.send_events && !@config.diagnostic_opt_out? diagnostic_accumulator = Impl::DiagnosticAccumulator.new(Impl::DiagnosticAccumulator.create_diagnostic_id(@sdk_key)) + @data_system.set_diagnostic_accumulator(diagnostic_accumulator) else diagnostic_accumulator = nil end @@ -138,38 +139,14 @@ def postfork(wait_for_sec = 5) @event_processor = EventProcessor.new(@sdk_key, @config, nil, diagnostic_accumulator) end - if @config.use_ldd? - @config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" } - @data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new - return # requestor and update processor are not used in this mode - end - - flag_tracker_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) - @flag_tracker = LaunchDarkly::Impl::FlagTracker.new(flag_tracker_broadcaster, lambda { |key, context| variation(key, context, nil) }) - - data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) - - # Make the update sink available on the config so that our data source factory can access the sink with a shared executor. - @config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(@store, data_source_broadcaster, flag_tracker_broadcaster) - - @data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(data_source_broadcaster, @config.data_source_update_sink) - - data_source_or_factory = @config.data_source || self.method(:create_default_data_source) - if data_source_or_factory.respond_to? :call - # Currently, data source factories take two parameters unless they need to be aware of diagnostic_accumulator, in - # which case they take three parameters. This will be changed in the future to use a less awkware mechanism. - if data_source_or_factory.arity == 3 - @data_source = data_source_or_factory.call(@sdk_key, @config, diagnostic_accumulator) - else - @data_source = data_source_or_factory.call(@sdk_key, @config) - end - else - @data_source = data_source_or_factory - end + # Create the flag tracker using the broadcaster from the data system + eval_fn = lambda { |key, context| variation(key, context, nil) } + @flag_tracker = Impl::FlagTracker.new(@data_system.flag_change_broadcaster, eval_fn) register_plugins(environment_metadata) - ready = @data_source.start + # Start the data system + ready = @data_system.start return unless wait_for_sec > 0 @@ -180,7 +157,7 @@ def postfork(wait_for_sec = 5) ok = ready.wait(wait_for_sec) if !ok @config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" } - elsif !@data_source.initialized? + elsif !initialized? @config.logger.error { "[LDClient] LaunchDarkly client initialization failed" } end end @@ -243,22 +220,6 @@ def add_hook(hook) @hooks.push(hook) end - # - # Tells the client that all pending analytics events should be delivered as soon as possible. - # - # When the LaunchDarkly client generates analytics events (from {#variation}, {#variation_detail}, - # {#identify}, or {#track}), they are queued on a worker thread. The event thread normally - # sends all queued events to LaunchDarkly at regular intervals, controlled by the - # {Config#flush_interval} option. Calling `flush` triggers a send without waiting for the - # next interval. - # - # Flushing is asynchronous, so this method will return before it is complete. However, if you - # call {#close}, events are guaranteed to be sent before that method returns. - # - def flush - @event_processor.flush - end - # # Creates a hash string that can be used by the JavaScript SDK to identify a context. # For more information, see [Secure mode](https://docs.launchdarkly.com/sdk/features/secure-mode#ruby). @@ -295,7 +256,7 @@ def secure_mode_hash(context) # @return [Boolean] true if the client has been initialized # def initialized? - @config.offline? || @config.use_ldd? || @data_source.initialized? + @data_system.data_availability == @data_system.target_availability end # @@ -601,7 +562,7 @@ def all_flags_state(context, options={}) return FeatureFlagsState.new(false) if @config.offline? unless initialized? - if @store.initialized? + if @data_system.store.initialized? @config.logger.warn { "Called all_flags_state before client initialization; using last known values from data store" } else @config.logger.warn { "Called all_flags_state before client initialization. Data store not available; returning empty state" } @@ -616,7 +577,7 @@ def all_flags_state(context, options={}) end begin - features = @store.all(Impl::DataStore::FEATURES) + features = @data_system.store.all(Impl::DataStore::FEATURES) rescue => exn Impl::Util.log_exception(@config.logger, "Unable to read flags for all_flags_state", exn) return FeatureFlagsState.new(false) @@ -663,11 +624,9 @@ def all_flags_state(context, options={}) # @return [void] def close @config.logger.info { "[LDClient] Closing LaunchDarkly client..." } - @data_source.stop + @data_system.stop @event_processor.stop @big_segment_store_manager.stop - @store.stop - @shared_executor.shutdown end # @@ -678,33 +637,6 @@ def close # attr_reader :big_segment_store_status_provider - # - # Returns an interface for tracking the status of a persistent data store. - # - # The {LaunchDarkly::Interfaces::DataStore::StatusProvider} has methods for - # checking whether the data store is (as far as the SDK knows) currently - # operational, tracking changes in this status, and getting cache - # statistics. These are only relevant for a persistent data store; if you - # are using an in-memory data store, then this method will return a stub - # object that provides no information. - # - # @return [LaunchDarkly::Interfaces::DataStore::StatusProvider] - # - attr_reader :data_store_status_provider - - # - # Returns an interface for tracking the status of the data source. - # - # The data source is the mechanism that the SDK uses to get feature flag - # configurations, such as a streaming connection (the default) or poll - # requests. The {LaunchDarkly::Interfaces::DataSource::StatusProvider} has - # methods for checking whether the data source is (as far as the SDK knows) - # currently operational and tracking changes in this status. - # - # @return [LaunchDarkly::Interfaces::DataSource::StatusProvider] - # - attr_reader :data_source_status_provider - # # Returns an interface for tracking changes in feature flag configurations. # @@ -712,23 +644,8 @@ def close # requesting notifications about feature flag changes using an event # listener model. # - attr_reader :flag_tracker - - private - - def create_default_data_source(sdk_key, config, diagnostic_accumulator) - if config.offline? - return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new - end - raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key - if config.stream? - Impl::DataSource::StreamProcessor.new(sdk_key, config, diagnostic_accumulator) - else - config.logger.info { "Disabling streaming API" } - config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" } - requestor = Impl::DataSource::Requestor.new(sdk_key, config) - Impl::DataSource::PollingProcessor.new(config, requestor) - end + def flag_tracker + @flag_tracker end # @@ -738,7 +655,7 @@ def create_default_data_source(sdk_key, config, diagnostic_accumulator) # # @return [Array] # - def variation_with_flag(key, context, default) + private def variation_with_flag(key, context, default) evaluate_internal(key, context, default, false) end @@ -750,7 +667,7 @@ def variation_with_flag(key, context, default) # # @return [Array] # - def evaluate_internal(key, context, default, with_reasons) + private def evaluate_internal(key, context, default, with_reasons) if @config.offline? return Evaluator.error_result(EvaluationReason::ERROR_CLIENT_NOT_READY, default), nil, nil end @@ -768,7 +685,7 @@ def evaluate_internal(key, context, default, with_reasons) end unless initialized? - if @store.initialized? + if @data_system.store.initialized? @config.logger.warn { "[LDClient] Client has not finished initializing; using last known values from feature store" } else @config.logger.error { "[LDClient] Client has not finished initializing; feature store unavailable, returning default value" } @@ -779,7 +696,7 @@ def evaluate_internal(key, context, default, with_reasons) end begin - feature = @store.get(Impl::DataStore::FEATURES, key) + feature = @data_system.store.get(Impl::DataStore::FEATURES, key) rescue # Ignored end diff --git a/spec/impl/data_system/fdv1_spec.rb b/spec/impl/data_system/fdv1_spec.rb index 40f8c349..e76aaf17 100644 --- a/spec/impl/data_system/fdv1_spec.rb +++ b/spec/impl/data_system/fdv1_spec.rb @@ -15,6 +15,33 @@ module DataSystem subject # Force creation of FDv1 instance expect(config.data_source_update_sink).to be_a(LaunchDarkly::Impl::DataSource::UpdateSink) end + + it "wraps the feature store with FeatureStoreClientWrapper" do + original_store = config.feature_store + subject # Force creation of FDv1 instance + + wrapped_store = config.feature_store + expect(wrapped_store).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper) + expect(wrapped_store.instance_variable_get(:@store)).to eq(original_store) + end + + it "avoids nested wrappers when config.feature_store is already wrapped" do + # First initialization wraps the store + original_store = config.feature_store + first_fdv1 = FDv1.new(sdk_key, config) + first_wrapper = config.feature_store + expect(first_wrapper).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper) + + # Second initialization (simulating postfork) should unwrap and re-wrap the original + second_fdv1 = FDv1.new(sdk_key, config) + second_wrapper = config.feature_store + expect(second_wrapper).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper) + + # The inner store should be the original, not the first wrapper + inner_store = second_wrapper.instance_variable_get(:@store) + expect(inner_store).to eq(original_store) + expect(inner_store).not_to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper) + end end describe "#start" do