diff --git a/lib/ldclient-rb/impl/data_source/null_processor.rb b/lib/ldclient-rb/impl/data_source/null_processor.rb new file mode 100644 index 00000000..0929e328 --- /dev/null +++ b/lib/ldclient-rb/impl/data_source/null_processor.rb @@ -0,0 +1,52 @@ +require 'concurrent' +require 'ldclient-rb/interfaces' + +module LaunchDarkly + module Impl + module DataSource + # + # A minimal UpdateProcessor implementation used when the SDK is in offline mode + # or daemon (LDD) mode. It does nothing except mark itself as initialized. + # + class NullUpdateProcessor + include LaunchDarkly::Interfaces::DataSource + + # + # Creates a new NullUpdateProcessor. + # + def initialize + @ready = Concurrent::Event.new + end + + # + # Starts the data source. Since this is a null implementation, it immediately + # sets the ready event to indicate initialization is complete. + # + # @return [Concurrent::Event] The ready event + # + def start + @ready.set + @ready + end + + # + # Stops the data source. This is a no-op for the null implementation. + # + # @return [void] + # + def stop + # Nothing to do + end + + # + # Checks if the data source has been initialized. + # + # @return [Boolean] Always returns true since this is a null implementation + # + def initialized? + true + end + end + end + end +end diff --git a/lib/ldclient-rb/impl/datasystem.rb b/lib/ldclient-rb/impl/data_system.rb similarity index 89% rename from lib/ldclient-rb/impl/datasystem.rb rename to lib/ldclient-rb/impl/data_system.rb index a3583d5c..53f434a1 100644 --- a/lib/ldclient-rb/impl/datasystem.rb +++ b/lib/ldclient-rb/impl/data_system.rb @@ -19,19 +19,20 @@ module DataSystem # # Starts the data system. # - # This method will return immediately. The provided event will be set when the system + # This method will return immediately. The returned event will be set when the system # has reached an initial state (either permanently failed, e.g. due to bad auth, or succeeded). # - # @param ready_event [Concurrent::Event] Event to set when initialization is complete - # @return [void] + # If called multiple times, returns the same event as the first call. + # + # @return [Concurrent::Event] Event that will be set when initialization is complete # - def start(ready_event) + def start raise NotImplementedError, "#{self.class} must implement #start" end # # Halts the data system. Should be called when the client is closed to stop any long running - # operations. + # operations. Makes the data system no longer usable. # # @return [void] # @@ -67,18 +68,23 @@ def data_store_status_provider end # - # Returns an interface for tracking changes in feature flag configurations. + # Returns the broadcaster for flag change notifications. + # + # Consumers can use this broadcaster to build their own flag tracker + # or listen for flag changes directly. # - # @return [LaunchDarkly::Interfaces::FlagTracker] + # @return [LaunchDarkly::Impl::Broadcaster] # - def flag_tracker - raise NotImplementedError, "#{self.class} must implement #flag_tracker" + def flag_change_broadcaster + raise NotImplementedError, "#{self.class} must implement #flag_change_broadcaster" end # # Indicates what form of data is currently available. # - # @return [Symbol] One of DataAvailability constants + # This is calculated dynamically based on current system state. + # + # @return [Symbol] one of the {DataAvailability} constants # def data_availability raise NotImplementedError, "#{self.class} must implement #data_availability" @@ -87,7 +93,7 @@ def data_availability # # Indicates the ideal form of data attainable given the current configuration. # - # @return [Symbol] One of DataAvailability constants + # @return [Symbol] one of the {#DataAvailability} constants # def target_availability raise NotImplementedError, "#{self.class} must implement #target_availability" @@ -103,18 +109,14 @@ def store end # - # Injects the flag value evaluation function used by the flag tracker to - # compute FlagValueChange events. The function signature should be - # (key, context) -> value. - # - # This method must be called after initialization to enable the flag tracker - # to compute value changes for flag change listeners. + # Sets the diagnostic accumulator for streaming initialization metrics. + # This should be called before start() to ensure metrics are collected. # - # @param eval_fn [Proc] The evaluation function + # @param diagnostic_accumulator [DiagnosticAccumulator] The diagnostic accumulator # @return [void] # - def set_flag_value_eval_fn(eval_fn) - raise NotImplementedError, "#{self.class} must implement #set_flag_value_eval_fn" + def set_diagnostic_accumulator(diagnostic_accumulator) + raise NotImplementedError, "#{self.class} must implement #set_diagnostic_accumulator" end # diff --git a/lib/ldclient-rb/impl/data_system/fdv1.rb b/lib/ldclient-rb/impl/data_system/fdv1.rb new file mode 100644 index 00000000..e5e20062 --- /dev/null +++ b/lib/ldclient-rb/impl/data_system/fdv1.rb @@ -0,0 +1,165 @@ +require 'concurrent' +require 'ldclient-rb/impl/broadcaster' +require 'ldclient-rb/impl/data_source' +require 'ldclient-rb/impl/data_source/null_processor' +require 'ldclient-rb/impl/data_store' +require 'ldclient-rb/impl/data_system' +require 'ldclient-rb/impl/store_client_wrapper' + +module LaunchDarkly + module Impl + module DataSystem + # + # FDv1 wires the existing v1 data source and store behavior behind the + # generic DataSystem surface. + # + # @see DataSystem + # + class FDv1 + include LaunchDarkly::Impl::DataSystem + + # + # Creates a new FDv1 data system. + # + # @param sdk_key [String] The SDK key + # @param config [LaunchDarkly::Config] The SDK configuration + # + def initialize(sdk_key, config) + @sdk_key = sdk_key + @config = config + @shared_executor = Concurrent::SingleThreadExecutor.new + + # Set up data store plumbing + @data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) + @data_store_update_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new( + @data_store_broadcaster + ) + + # Wrap the data store with client wrapper (must be created before status provider) + @store_wrapper = LaunchDarkly::Impl::FeatureStoreClientWrapper.new( + @config.feature_store, + @data_store_update_sink, + @config.logger + ) + + # Create status provider with store wrapper + @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new( + @store_wrapper, + @data_store_update_sink + ) + + # Set up data source plumbing + @data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) + @flag_change_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) + @data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new( + @store_wrapper, + @data_source_broadcaster, + @flag_change_broadcaster + ) + @data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new( + @data_source_broadcaster, + @data_source_update_sink + ) + + # Ensure v1 processors can find the sink via config for status updates + @config.data_source_update_sink = @data_source_update_sink + + # Update processor created in start() + @update_processor = nil + + # Diagnostic accumulator provided by client for streaming metrics + @diagnostic_accumulator = nil + end + + # (see DataSystem#start) + def start + @update_processor ||= make_update_processor + @update_processor.start + end + + # (see DataSystem#stop) + def stop + @update_processor&.stop + @shared_executor.shutdown + end + + # (see DataSystem#store) + def store + @store_wrapper + end + + # (see DataSystem#set_diagnostic_accumulator) + def set_diagnostic_accumulator(diagnostic_accumulator) + @diagnostic_accumulator = diagnostic_accumulator + end + + # (see DataSystem#data_source_status_provider) + def data_source_status_provider + @data_source_status_provider + end + + # (see DataSystem#data_store_status_provider) + def data_store_status_provider + @data_store_status_provider + end + + # (see DataSystem#flag_change_broadcaster) + def flag_change_broadcaster + @flag_change_broadcaster + end + + # + # (see DataSystem#data_availability) + # + # In LDD mode, always returns CACHED for backwards compatibility, + # even if the store is empty. + # + def data_availability + return DataAvailability::DEFAULTS if @config.offline? + return DataAvailability::REFRESHED if @update_processor && @update_processor.initialized? + return DataAvailability::CACHED if @store_wrapper.initialized? + + DataAvailability::DEFAULTS + end + + # (see DataSystem#target_availability) + def target_availability + return DataAvailability::DEFAULTS if @config.offline? + + DataAvailability::REFRESHED + end + + # + # Creates the appropriate update processor based on the configuration. + # + # @return [Object] The update processor + # + private def make_update_processor + # Handle custom data source (factory or instance) + if @config.data_source + return @config.data_source unless @config.data_source.respond_to?(:call) + + # Factory - call with appropriate arity + return @config.data_source.arity == 3 ? + @config.data_source.call(@sdk_key, @config, @diagnostic_accumulator) : + @config.data_source.call(@sdk_key, @config) + end + + # Create default data source based on config + return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new if @config.offline? || @config.use_ldd? + + if @config.stream? + require 'ldclient-rb/stream' + return LaunchDarkly::StreamProcessor.new(@sdk_key, @config, @diagnostic_accumulator) + end + + # Polling processor + require 'ldclient-rb/polling' + requestor = LaunchDarkly::Requestor.new(@sdk_key, @config) + LaunchDarkly::PollingProcessor.new(@config, requestor) + end + end + end + end +end + diff --git a/lib/ldclient-rb/impl/integrations/test_data/test_data_source.rb b/lib/ldclient-rb/impl/integrations/test_data/test_data_source.rb index a2799a7d..4533e415 100644 --- a/lib/ldclient-rb/impl/integrations/test_data/test_data_source.rb +++ b/lib/ldclient-rb/impl/integrations/test_data/test_data_source.rb @@ -5,7 +5,6 @@ module LaunchDarkly module Impl module Integrations module TestData - # @private class TestDataSource include LaunchDarkly::Interfaces::DataSource diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index 2c986933..16f8a442 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -2,6 +2,7 @@ require "ldclient-rb/impl/broadcaster" require "ldclient-rb/impl/data_source" require "ldclient-rb/impl/data_store" +require "ldclient-rb/impl/data_source/null_processor" require "ldclient-rb/impl/diagnostic_events" require "ldclient-rb/impl/evaluator" require "ldclient-rb/impl/evaluation_with_hook_result" @@ -132,7 +133,7 @@ def postfork(wait_for_sec = 5) if @config.use_ldd? @config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" } - @data_source = NullUpdateProcessor.new + @data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new return # requestor and update processor are not used in this mode end @@ -710,7 +711,7 @@ def close def create_default_data_source(sdk_key, config, diagnostic_accumulator) if config.offline? - return NullUpdateProcessor.new + return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new end raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key if config.stream? @@ -877,23 +878,4 @@ def evaluate_internal(key, context, default, with_reasons) false end end - - # - # Used internally when the client is offline. - # @private - # - class NullUpdateProcessor - def start - e = Concurrent::Event.new - e.set - e - end - - def initialized? - true - end - - def stop - end - end end diff --git a/spec/impl/data_source/null_processor_spec.rb b/spec/impl/data_source/null_processor_spec.rb new file mode 100644 index 00000000..3249c61f --- /dev/null +++ b/spec/impl/data_source/null_processor_spec.rb @@ -0,0 +1,57 @@ +require "spec_helper" +require "ldclient-rb/impl/data_source/null_processor" + +module LaunchDarkly + module Impl + module DataSource + describe NullUpdateProcessor do + subject { NullUpdateProcessor.new } + + describe "#initialize" do + it "creates a ready event" do + expect(subject.instance_variable_get(:@ready)).to be_a(Concurrent::Event) + end + end + + describe "#start" do + it "returns a ready event that is already set" do + ready_event = subject.start + expect(ready_event).to be_a(Concurrent::Event) + expect(ready_event.set?).to be true + end + + it "returns the same event on multiple calls" do + first_event = subject.start + second_event = subject.start + + expect(second_event).to be(first_event) + end + end + + describe "#stop" do + it "does nothing and does not raise an error" do + expect { subject.stop }.not_to raise_error + end + end + + describe "#initialized?" do + it "always returns true" do + expect(subject.initialized?).to be true + end + + it "returns true even before start is called" do + processor = NullUpdateProcessor.new + expect(processor.initialized?).to be true + end + end + + describe "DataSource interface" do + it "includes the DataSource module" do + expect(subject.class.ancestors).to include(LaunchDarkly::Interfaces::DataSource) + end + end + end + end + end +end + diff --git a/spec/impl/data_system/fdv1_spec.rb b/spec/impl/data_system/fdv1_spec.rb new file mode 100644 index 00000000..9c905a2f --- /dev/null +++ b/spec/impl/data_system/fdv1_spec.rb @@ -0,0 +1,273 @@ +require "spec_helper" +require "mock_components" +require "ldclient-rb/impl/data_system/fdv1" + +module LaunchDarkly + module Impl + module DataSystem + describe FDv1 do + let(:sdk_key) { "test-sdk-key" } + let(:config) { LaunchDarkly::Config.new } + subject { FDv1.new(sdk_key, config) } + + describe "#initialize" do + it "injects data_source_update_sink into config" do + subject # Force creation of FDv1 instance + expect(config.data_source_update_sink).to be_a(LaunchDarkly::Impl::DataSource::UpdateSink) + end + end + + describe "#start" do + it "returns a Concurrent::Event" do + ready_event = subject.start + expect(ready_event).to be_a(Concurrent::Event) + end + + it "creates streaming processor by default" do + allow(LaunchDarkly::StreamProcessor).to receive(:new).and_call_original + subject.start + expect(LaunchDarkly::StreamProcessor).to have_received(:new).with(sdk_key, config, nil) + end + + context "with polling mode" do + let(:config) { LaunchDarkly::Config.new(stream: false) } + + it "creates polling processor" do + allow(LaunchDarkly::PollingProcessor).to receive(:new).and_call_original + subject.start + expect(LaunchDarkly::PollingProcessor).to have_received(:new) + end + end + + context "with offline mode" do + let(:config) { LaunchDarkly::Config.new(offline: true) } + + it "creates null processor" do + expect(LaunchDarkly::Impl::DataSource::NullUpdateProcessor).to receive(:new).and_call_original + ready_event = subject.start + expect(ready_event.set?).to be true + end + end + + context "with LDD mode" do + let(:config) { LaunchDarkly::Config.new(use_ldd: true) } + + it "creates null processor" do + expect(LaunchDarkly::Impl::DataSource::NullUpdateProcessor).to receive(:new).and_call_original + ready_event = subject.start + expect(ready_event.set?).to be true + end + end + + context "with custom data source factory" do + let(:custom_processor) { MockUpdateProcessor.new } + let(:factory) { ->(sdk_key, config, diag) { custom_processor } } + let(:config) { LaunchDarkly::Config.new(data_source: factory) } + + it "calls factory with sdk_key, config, and diagnostic_accumulator" do + expect(factory).to receive(:call).with(sdk_key, config, nil).and_return(custom_processor) + subject.start + end + + it "passes diagnostic_accumulator if set" do + diagnostic_accumulator = double("DiagnosticAccumulator") + subject.set_diagnostic_accumulator(diagnostic_accumulator) + expect(factory).to receive(:call).with(sdk_key, config, diagnostic_accumulator).and_return(custom_processor) + subject.start + end + + context "with arity 2 factory" do + let(:factory) { ->(sdk_key, config) { custom_processor } } + + it "calls factory without diagnostic_accumulator" do + expect(factory).to receive(:call).with(sdk_key, config).and_return(custom_processor) + subject.start + end + end + end + + context "with custom data source instance" do + let(:custom_processor) { MockUpdateProcessor.new } + let(:config) { LaunchDarkly::Config.new(data_source: custom_processor) } + + it "uses the instance directly" do + ready_event = subject.start + expect(ready_event).to be_a(Concurrent::Event) + end + end + + it "returns the same event on multiple calls" do + first_event = subject.start + second_event = subject.start + third_event = subject.start + + expect(second_event).to be(first_event) + expect(third_event).to be(first_event) + end + + it "does not create a new processor on subsequent calls" do + processor = MockUpdateProcessor.new + allow(subject).to receive(:make_update_processor).and_return(processor) + + subject.start + expect(subject).to have_received(:make_update_processor).once + + subject.start + subject.start + # Should still only be called once + expect(subject).to have_received(:make_update_processor).once + end + end + + describe "#stop" do + it "stops the update processor" do + processor = MockUpdateProcessor.new + allow(subject).to receive(:make_update_processor).and_return(processor) + subject.start + expect(processor).to receive(:stop) + subject.stop + end + + it "shuts down the executor" do + executor = subject.instance_variable_get(:@shared_executor) + expect(executor).to receive(:shutdown) + subject.stop + end + + it "does nothing if not started" do + expect { subject.stop }.not_to raise_error + end + end + + describe "#store" do + it "returns the store wrapper" do + expect(subject.store).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper) + end + end + + describe "#set_diagnostic_accumulator" do + it "stores the diagnostic accumulator" do + diagnostic_accumulator = double("DiagnosticAccumulator") + expect { subject.set_diagnostic_accumulator(diagnostic_accumulator) }.not_to raise_error + end + end + + describe "#data_source_status_provider" do + it "returns the data source status provider" do + expect(subject.data_source_status_provider).to be_a(LaunchDarkly::Impl::DataSource::StatusProvider) + end + end + + describe "#data_store_status_provider" do + it "returns the data store status provider" do + expect(subject.data_store_status_provider).to be_a(LaunchDarkly::Impl::DataStore::StatusProvider) + end + end + + describe "#flag_change_broadcaster" do + it "returns the flag change broadcaster" do + expect(subject.flag_change_broadcaster).to be_a(LaunchDarkly::Impl::Broadcaster) + end + end + + describe "#data_availability" do + context "when offline" do + let(:config) { LaunchDarkly::Config.new(offline: true) } + + it "returns DEFAULTS" do + expect(subject.data_availability).to eq(DataAvailability::DEFAULTS) + end + end + + context "when update processor is initialized" do + it "returns REFRESHED" do + processor = MockUpdateProcessor.new + allow(subject).to receive(:make_update_processor).and_return(processor) + subject.start + processor.ready.set + + expect(subject.data_availability).to eq(DataAvailability::REFRESHED) + end + end + + context "when store is initialized but processor is not" do + it "returns CACHED" do + # Initialize the store + subject.store.init({}) + + expect(subject.data_availability).to eq(DataAvailability::CACHED) + end + end + + context "when neither processor nor store are initialized" do + it "returns DEFAULTS" do + expect(subject.data_availability).to eq(DataAvailability::DEFAULTS) + end + end + + context "in LDD mode" do + let(:config) { LaunchDarkly::Config.new(use_ldd: true) } + + it "always returns REFRESHED for backwards compatibility" do + subject.start + # Returns REFRESHED even when store is empty + expect(subject.data_availability).to eq(DataAvailability::REFRESHED) + + # Still returns REFRESHED when store is initialized + subject.store.init({}) + expect(subject.data_availability).to eq(DataAvailability::REFRESHED) + end + end + end + + describe "#target_availability" do + context "when offline" do + let(:config) { LaunchDarkly::Config.new(offline: true) } + + it "returns DEFAULTS" do + expect(subject.target_availability).to eq(DataAvailability::DEFAULTS) + end + end + + context "when not offline" do + it "returns REFRESHED" do + expect(subject.target_availability).to eq(DataAvailability::REFRESHED) + end + end + + context "with LDD mode" do + let(:config) { LaunchDarkly::Config.new(use_ldd: true) } + + it "returns REFRESHED" do + expect(subject.target_availability).to eq(DataAvailability::REFRESHED) + end + end + end + + describe "integration with diagnostic accumulator" do + it "passes diagnostic accumulator to streaming processor" do + diagnostic_accumulator = double("DiagnosticAccumulator") + subject.set_diagnostic_accumulator(diagnostic_accumulator) + + expect(LaunchDarkly::StreamProcessor).to receive(:new).with(sdk_key, config, diagnostic_accumulator).and_call_original + subject.start + end + + context "with polling mode" do + let(:config) { LaunchDarkly::Config.new(stream: false) } + + it "does not pass diagnostic accumulator to polling processor" do + diagnostic_accumulator = double("DiagnosticAccumulator") + subject.set_diagnostic_accumulator(diagnostic_accumulator) + + # PollingProcessor doesn't accept diagnostic_accumulator + expect(LaunchDarkly::PollingProcessor).to receive(:new).with(config, anything).and_call_original + subject.start + end + end + end + end + end + end +end + diff --git a/spec/impl/datasystem_spec.rb b/spec/impl/datasystem_spec.rb index c35425c6..7805900b 100644 --- a/spec/impl/datasystem_spec.rb +++ b/spec/impl/datasystem_spec.rb @@ -1,5 +1,5 @@ require "spec_helper" -require "ldclient-rb/impl/datasystem" +require "ldclient-rb/impl/data_system" module LaunchDarkly module Impl @@ -13,8 +13,7 @@ module Impl end it "start raises NotImplementedError" do - ready_event = double("Event") - expect { test_instance.start(ready_event) }.to raise_error(NotImplementedError, /must implement #start/) + expect { test_instance.start }.to raise_error(NotImplementedError, /must implement #start/) end it "stop raises NotImplementedError" do @@ -29,8 +28,8 @@ module Impl expect { test_instance.data_store_status_provider }.to raise_error(NotImplementedError, /must implement #data_store_status_provider/) end - it "flag_tracker raises NotImplementedError" do - expect { test_instance.flag_tracker }.to raise_error(NotImplementedError, /must implement #flag_tracker/) + it "flag_change_broadcaster raises NotImplementedError" do + expect { test_instance.flag_change_broadcaster }.to raise_error(NotImplementedError, /must implement #flag_change_broadcaster/) end it "data_availability raises NotImplementedError" do @@ -45,8 +44,9 @@ module Impl expect { test_instance.store }.to raise_error(NotImplementedError, /must implement #store/) end - it "set_flag_value_eval_fn raises NotImplementedError" do - expect { test_instance.set_flag_value_eval_fn(nil) }.to raise_error(NotImplementedError, /must implement #set_flag_value_eval_fn/) + it "set_diagnostic_accumulator raises NotImplementedError" do + accumulator = double("DiagnosticAccumulator") + expect { test_instance.set_diagnostic_accumulator(accumulator) }.to raise_error(NotImplementedError, /must implement #set_diagnostic_accumulator/) end end diff --git a/spec/mock_components.rb b/spec/mock_components.rb index 1a4470fa..c7dc8b9b 100644 --- a/spec/mock_components.rb +++ b/spec/mock_components.rb @@ -2,6 +2,7 @@ require "ldclient-rb/impl/big_segments" require "ldclient-rb/impl/evaluator" +require "ldclient-rb/impl/data_source/null_processor" require "ldclient-rb/interfaces" def sdk_key @@ -9,7 +10,7 @@ def sdk_key end def null_data - LaunchDarkly::NullUpdateProcessor.new + LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new end def null_logger @@ -39,6 +40,37 @@ def basic_context end module LaunchDarkly + class MockUpdateProcessor + attr_reader :ready + + def initialize + @ready = Concurrent::Event.new + @started = false + @stopped = false + end + + def start + @started = true + @ready + end + + def stop + @stopped = true + end + + def initialized? + @ready.set? + end + + def started? + @started + end + + def stopped? + @stopped + end + end + class CapturingFeatureStore attr_reader :received_data