Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions lib/ldclient-rb/events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ def record_custom_event(
def record_migration_op_event(event)
end

#
# Tells the event processor that all pending analytics events should be delivered as soon as possible.
#
# When the LaunchDarkly client generates analytics events (from {LaunchDarkly::LDClient#variation},
# {LaunchDarkly::LDClient#variation_detail}, {LaunchDarkly::LDClient#identify}, or
# {LaunchDarkly::LDClient#track}), they are queued on a worker thread. The event thread normally
# sends all queued events to LaunchDarkly at regular intervals, controlled by the
# {LaunchDarkly::Config#flush_interval} option. Calling `flush` triggers a send without waiting
# for the next interval.
#
# Flushing is asynchronous, so this method will return before it is complete. However, if you
# call {LaunchDarkly::LDClient#close}, events are guaranteed to be sent before that method returns.
#
def flush
end

Expand Down
16 changes: 14 additions & 2 deletions lib/ldclient-rb/impl/data_system/fdv1.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,22 @@ def initialize(sdk_key, config)
@data_store_broadcaster
)

# Wrap the data store with client wrapper (must be created before status provider)
# Preserve the original unwrapped store to avoid nested wrappers on postfork
original_store = @config.feature_store
if original_store.is_a?(LaunchDarkly::Impl::FeatureStoreClientWrapper)
original_store = original_store.instance_variable_get(:@store)
end

# Wrap the original data store with client wrapper (must be created before status provider)
@store_wrapper = LaunchDarkly::Impl::FeatureStoreClientWrapper.new(
@config.feature_store,
original_store,
@data_store_update_sink,
@config.logger
)

# Update config to use wrapped store so data sources can access it
@config.instance_variable_set(:@feature_store, @store_wrapper)

# Create status provider with store wrapper
@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(
@store_wrapper,
Expand Down Expand Up @@ -83,6 +92,7 @@ def start
# (see DataSystem#stop)
def stop
@update_processor&.stop
@store_wrapper.stop
@shared_executor.shutdown
end

Expand Down Expand Up @@ -156,6 +166,8 @@ def target_availability
end

# Polling processor
@config.logger.info { "Disabling streaming API" }
@config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" }
requestor = LaunchDarkly::Impl::DataSource::Requestor.new(@sdk_key, @config)
LaunchDarkly::Impl::DataSource::PollingProcessor.new(@config, requestor)
end
Expand Down
179 changes: 48 additions & 131 deletions lib/ldclient-rb/ldclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@
require "ldclient-rb/impl/broadcaster"
require "ldclient-rb/impl/context"
require "ldclient-rb/impl/data_source"
require "ldclient-rb/impl/data_source/null_processor"
require "ldclient-rb/impl/data_source/polling"
require "ldclient-rb/impl/data_source/requestor"
require "ldclient-rb/impl/data_source/stream"
require "ldclient-rb/impl/data_store"
require "ldclient-rb/impl/data_system/fdv1"
require "ldclient-rb/impl/diagnostic_events"
require "ldclient-rb/impl/evaluation_with_hook_result"
require "ldclient-rb/impl/evaluator"
require "ldclient-rb/impl/flag_tracker"
require "ldclient-rb/impl/migrations/tracker"
require "ldclient-rb/impl/store_client_wrapper"
require "ldclient-rb/impl/util"
require "ldclient-rb/events"
require "ldclient-rb/in_memory_store"
require "concurrent"
require "concurrent/atomics"
require "digest/sha1"
Expand All @@ -37,6 +32,18 @@ class LDClient

def_delegators :@config, :logger

# @!method flush
# Delegates to {LaunchDarkly::EventProcessorMethods#flush}.
def_delegator :@event_processor, :flush

# @!method data_store_status_provider
# Delegates to the data system {LaunchDarkly::Impl::DataSystem#data_store_status_provider}.
# @return [LaunchDarkly::Interfaces::DataStore::StatusProvider]
# @!method data_source_status_provider
# Delegates to the data system {LaunchDarkly::Impl::DataSystem#data_source_status_provider}.
# @return [LaunchDarkly::Interfaces::DataSource::StatusProvider]
def_delegators :@data_system, :data_store_status_provider, :data_source_status_provider

#
# Creates a new client instance that connects to LaunchDarkly. A custom
# configuration parameter can also supplied to specify advanced options,
Expand All @@ -55,13 +62,16 @@ class LDClient
#
def initialize(sdk_key, config = Config.default, wait_for_sec = 5)
# Note that sdk_key is normally a required parameter, and a nil value would cause the SDK to
# fail in most configurations. However, there are some configurations where it would be OK
# (offline = true, *or* we are using LDD mode or the file data source and events are disabled
# so we're not connecting to any LD services) so rather than try to check for all of those
# up front, we will let the constructors for the data source implementations implement this
# fail-fast as appropriate, and just check here for the part regarding events.
if !config.offline? && config.send_events
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil?
# fail in most configurations. However, there are some configurations where it would be OK to
# not provide a SDK key.
# * Offline mode
# * Using LDD mode with events disabled
# * Using a custom data source (like FileData) with events disabled
if !config.offline? && sdk_key.nil?
# If the data source is nil we create a default data source which requires the SDK key.
if config.send_events || (!config.use_ldd? && config.data_source.nil?)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conditional doesn't make sense to me.

The SDK key is required if:

  1. We are sending events, or
  2. We aren't in daemon mode AND we don't have a data source?

If we aren't in daemon mode, then we should have a data source, and that's when an SDK key is required. If there isn't a data source, what is the key for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the comments to clarify what is happening here. Essentially if we are using a custom data source (like FileData) we don't require an SDK key. If the datasource is nil, then we create a default data source (either polling or streaming) which require an sdk key.

raise ArgumentError, "sdk_key must not be nil"
end
end

@sdk_key = sdk_key
Expand Down Expand Up @@ -89,9 +99,10 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5)
# @param wait_for_sec [Float] maximum time (in seconds) to wait for initialization
#
def postfork(wait_for_sec = 5)
@data_source = nil
@data_system = nil
@event_processor = nil
@big_segment_store_manager = nil
@flag_tracker = nil

start_up(wait_for_sec)
end
Expand All @@ -102,32 +113,22 @@ def postfork(wait_for_sec = 5)

@hooks = Concurrent::Array.new(@config.hooks + plugin_hooks)

@shared_executor = Concurrent::SingleThreadExecutor.new

data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
store_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(data_store_broadcaster)

# We need to wrap the feature store object with a FeatureStoreClientWrapper in order to add
# some necessary logic around updates. Unfortunately, we have code elsewhere that accesses
# the feature store through the Config object, so we need to make a new Config that uses
# the wrapped store.
@store = Impl::FeatureStoreClientWrapper.new(@config.feature_store, store_sink, @config.logger)
updated_config = @config.clone
updated_config.instance_variable_set(:@feature_store, @store)
@config = updated_config

@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(@store, store_sink)
# Initialize the data system (FDv1 for now, will support FDv2 in the future)
# Note: FDv1 will update @config.feature_store to use its wrapped store
@data_system = Impl::DataSystem::FDv1.new(@sdk_key, @config)

# Components not managed by data system
@big_segment_store_manager = Impl::BigSegmentStoreManager.new(@config.big_segments, @config.logger)
@big_segment_store_status_provider = @big_segment_store_manager.status_provider

get_flag = lambda { |key| @store.get(Impl::DataStore::FEATURES, key) }
get_segment = lambda { |key| @store.get(Impl::DataStore::SEGMENTS, key) }
get_flag = lambda { |key| @data_system.store.get(Impl::DataStore::FEATURES, key) }
get_segment = lambda { |key| @data_system.store.get(Impl::DataStore::SEGMENTS, key) }
get_big_segments_membership = lambda { |key| @big_segment_store_manager.get_context_membership(key) }
@evaluator = LaunchDarkly::Impl::Evaluator.new(get_flag, get_segment, get_big_segments_membership, @config.logger)

if !@config.offline? && @config.send_events && !@config.diagnostic_opt_out?
diagnostic_accumulator = Impl::DiagnosticAccumulator.new(Impl::DiagnosticAccumulator.create_diagnostic_id(@sdk_key))
@data_system.set_diagnostic_accumulator(diagnostic_accumulator)
else
diagnostic_accumulator = nil
end
Expand All @@ -138,38 +139,14 @@ def postfork(wait_for_sec = 5)
@event_processor = EventProcessor.new(@sdk_key, @config, nil, diagnostic_accumulator)
end

if @config.use_ldd?
@config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" }
@data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
return # requestor and update processor are not used in this mode
end

flag_tracker_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@flag_tracker = LaunchDarkly::Impl::FlagTracker.new(flag_tracker_broadcaster, lambda { |key, context| variation(key, context, nil) })

data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)

# Make the update sink available on the config so that our data source factory can access the sink with a shared executor.
@config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(@store, data_source_broadcaster, flag_tracker_broadcaster)

@data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(data_source_broadcaster, @config.data_source_update_sink)

data_source_or_factory = @config.data_source || self.method(:create_default_data_source)
if data_source_or_factory.respond_to? :call
# Currently, data source factories take two parameters unless they need to be aware of diagnostic_accumulator, in
# which case they take three parameters. This will be changed in the future to use a less awkware mechanism.
if data_source_or_factory.arity == 3
@data_source = data_source_or_factory.call(@sdk_key, @config, diagnostic_accumulator)
else
@data_source = data_source_or_factory.call(@sdk_key, @config)
end
else
@data_source = data_source_or_factory
end
# Create the flag tracker using the broadcaster from the data system
eval_fn = lambda { |key, context| variation(key, context, nil) }
@flag_tracker = Impl::FlagTracker.new(@data_system.flag_change_broadcaster, eval_fn)

register_plugins(environment_metadata)

ready = @data_source.start
# Start the data system
ready = @data_system.start

return unless wait_for_sec > 0

Expand All @@ -180,7 +157,7 @@ def postfork(wait_for_sec = 5)
ok = ready.wait(wait_for_sec)
if !ok
@config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" }
elsif !@data_source.initialized?
elsif !initialized?
@config.logger.error { "[LDClient] LaunchDarkly client initialization failed" }
end
end
Expand Down Expand Up @@ -243,22 +220,6 @@ def add_hook(hook)
@hooks.push(hook)
end

#
# Tells the client that all pending analytics events should be delivered as soon as possible.
#
# When the LaunchDarkly client generates analytics events (from {#variation}, {#variation_detail},
# {#identify}, or {#track}), they are queued on a worker thread. The event thread normally
# sends all queued events to LaunchDarkly at regular intervals, controlled by the
# {Config#flush_interval} option. Calling `flush` triggers a send without waiting for the
# next interval.
#
# Flushing is asynchronous, so this method will return before it is complete. However, if you
# call {#close}, events are guaranteed to be sent before that method returns.
#
def flush
@event_processor.flush
end

#
# Creates a hash string that can be used by the JavaScript SDK to identify a context.
# For more information, see [Secure mode](https://docs.launchdarkly.com/sdk/features/secure-mode#ruby).
Expand Down Expand Up @@ -295,7 +256,7 @@ def secure_mode_hash(context)
# @return [Boolean] true if the client has been initialized
#
def initialized?
@config.offline? || @config.use_ldd? || @data_source.initialized?
@data_system.data_availability == @data_system.target_availability
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure these lines are equivalent given our previous discussions about this? I can't remember exactly where we landed on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should be equivalent for FDv1. This will need to be altered a bit when we introduce FDv2.

  • Offline returns and expects DEFAULTS
  • Use_Ldd returns and expects REFRESHED (because it uses null processor) ✅
  • If the data_source.Initialized is true the data_system returns REFRESHED and CACHED if false, but we expect REFRESHED

end

#
Expand Down Expand Up @@ -601,7 +562,7 @@ def all_flags_state(context, options={})
return FeatureFlagsState.new(false) if @config.offline?

unless initialized?
if @store.initialized?
if @data_system.store.initialized?
@config.logger.warn { "Called all_flags_state before client initialization; using last known values from data store" }
else
@config.logger.warn { "Called all_flags_state before client initialization. Data store not available; returning empty state" }
Expand All @@ -616,7 +577,7 @@ def all_flags_state(context, options={})
end

begin
features = @store.all(Impl::DataStore::FEATURES)
features = @data_system.store.all(Impl::DataStore::FEATURES)
rescue => exn
Impl::Util.log_exception(@config.logger, "Unable to read flags for all_flags_state", exn)
return FeatureFlagsState.new(false)
Expand Down Expand Up @@ -663,11 +624,9 @@ def all_flags_state(context, options={})
# @return [void]
def close
@config.logger.info { "[LDClient] Closing LaunchDarkly client..." }
@data_source.stop
@data_system.stop
@event_processor.stop
@big_segment_store_manager.stop
@store.stop
@shared_executor.shutdown
end

#
Expand All @@ -678,57 +637,15 @@ def close
#
attr_reader :big_segment_store_status_provider

#
# Returns an interface for tracking the status of a persistent data store.
#
# The {LaunchDarkly::Interfaces::DataStore::StatusProvider} has methods for
# checking whether the data store is (as far as the SDK knows) currently
# operational, tracking changes in this status, and getting cache
# statistics. These are only relevant for a persistent data store; if you
# are using an in-memory data store, then this method will return a stub
# object that provides no information.
#
# @return [LaunchDarkly::Interfaces::DataStore::StatusProvider]
#
attr_reader :data_store_status_provider

#
# Returns an interface for tracking the status of the data source.
#
# The data source is the mechanism that the SDK uses to get feature flag
# configurations, such as a streaming connection (the default) or poll
# requests. The {LaunchDarkly::Interfaces::DataSource::StatusProvider} has
# methods for checking whether the data source is (as far as the SDK knows)
# currently operational and tracking changes in this status.
#
# @return [LaunchDarkly::Interfaces::DataSource::StatusProvider]
#
attr_reader :data_source_status_provider

#
# Returns an interface for tracking changes in feature flag configurations.
#
# The {LaunchDarkly::Interfaces::FlagTracker} contains methods for
# requesting notifications about feature flag changes using an event
# listener model.
#
attr_reader :flag_tracker

private

def create_default_data_source(sdk_key, config, diagnostic_accumulator)
if config.offline?
return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
end
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key
if config.stream?
Impl::DataSource::StreamProcessor.new(sdk_key, config, diagnostic_accumulator)
else
config.logger.info { "Disabling streaming API" }
config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" }
requestor = Impl::DataSource::Requestor.new(sdk_key, config)
Impl::DataSource::PollingProcessor.new(config, requestor)
end
def flag_tracker
@flag_tracker
end

#
Expand All @@ -738,7 +655,7 @@ def create_default_data_source(sdk_key, config, diagnostic_accumulator)
#
# @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>]
#
def variation_with_flag(key, context, default)
private def variation_with_flag(key, context, default)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file used a mix of per method private and all methods after the declaration on line 717 of the original file. Moving to defining per method.

evaluate_internal(key, context, default, false)
end

Expand All @@ -750,7 +667,7 @@ def variation_with_flag(key, context, default)
#
# @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>]
#
def evaluate_internal(key, context, default, with_reasons)
private def evaluate_internal(key, context, default, with_reasons)
if @config.offline?
return Evaluator.error_result(EvaluationReason::ERROR_CLIENT_NOT_READY, default), nil, nil
end
Expand All @@ -768,7 +685,7 @@ def evaluate_internal(key, context, default, with_reasons)
end

unless initialized?
if @store.initialized?
if @data_system.store.initialized?
@config.logger.warn { "[LDClient] Client has not finished initializing; using last known values from feature store" }
else
@config.logger.error { "[LDClient] Client has not finished initializing; feature store unavailable, returning default value" }
Expand All @@ -779,7 +696,7 @@ def evaluate_internal(key, context, default, with_reasons)
end

begin
feature = @store.get(Impl::DataStore::FEATURES, key)
feature = @data_system.store.get(Impl::DataStore::FEATURES, key)
rescue
# Ignored
end
Expand Down
Loading