Skip to content
52 changes: 52 additions & 0 deletions lib/ldclient-rb/impl/data_source/null_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
require 'concurrent'
require 'ldclient-rb/interfaces'

module LaunchDarkly
module Impl
module DataSource
#
# A minimal UpdateProcessor implementation used when the SDK is in offline mode
# or daemon (LDD) mode. It does nothing except mark itself as initialized.
#
class NullUpdateProcessor
include LaunchDarkly::Interfaces::DataSource

#
# Creates a new NullUpdateProcessor.
#
def initialize
@ready = Concurrent::Event.new
end

#
# Starts the data source. Since this is a null implementation, it immediately
# sets the ready event to indicate initialization is complete.
#
# @return [Concurrent::Event] The ready event
#
def start
@ready.set
@ready
end

#
# Stops the data source. This is a no-op for the null implementation.
#
# @return [void]
#
def stop
# Nothing to do
end

#
# Checks if the data source has been initialized.
#
# @return [Boolean] Always returns true since this is a null implementation
#
def initialized?
true
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ module DataSystem
#
# Starts the data system.
#
# This method will return immediately. The provided event will be set when the system
# This method will return immediately. The returned event will be set when the system
# has reached an initial state (either permanently failed, e.g. due to bad auth, or succeeded).
#
# @param ready_event [Concurrent::Event] Event to set when initialization is complete
# @return [void]
# If called multiple times, returns the same event as the first call.
#
# @return [Concurrent::Event] Event that will be set when initialization is complete
#
def start(ready_event)
def start
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little torn on having the ready_event passed in vs returning it. The existing data_source public interface for its start method returns a Concurrent::Event. This means we would either need an interface layer to sync the two events between the data system and the data source, or we would need a breaking change to have the data source interface accept the event vs return it.

raise NotImplementedError, "#{self.class} must implement #start"
end

#
# Halts the data system. Should be called when the client is closed to stop any long running
# operations.
# operations. Makes the data system no longer usable.
#
# @return [void]
#
Expand Down Expand Up @@ -67,18 +68,23 @@ def data_store_status_provider
end

#
# Returns an interface for tracking changes in feature flag configurations.
# Returns the broadcaster for flag change notifications.
#
# Consumers can use this broadcaster to build their own flag tracker
# or listen for flag changes directly.
#
# @return [LaunchDarkly::Interfaces::FlagTracker]
# @return [LaunchDarkly::Impl::Broadcaster]
#
def flag_tracker
raise NotImplementedError, "#{self.class} must implement #flag_tracker"
def flag_change_broadcaster
raise NotImplementedError, "#{self.class} must implement #flag_change_broadcaster"
end

#
# Indicates what form of data is currently available.
#
# @return [Symbol] One of DataAvailability constants
# This is calculated dynamically based on current system state.
#
# @return [Symbol] one of the {DataAvailability} constants
#
def data_availability
raise NotImplementedError, "#{self.class} must implement #data_availability"
Expand All @@ -87,7 +93,7 @@ def data_availability
#
# Indicates the ideal form of data attainable given the current configuration.
#
# @return [Symbol] One of DataAvailability constants
# @return [Symbol] one of the {#DataAvailability} constants
#
def target_availability
raise NotImplementedError, "#{self.class} must implement #target_availability"
Expand All @@ -103,18 +109,14 @@ def store
end

#
# Injects the flag value evaluation function used by the flag tracker to
# compute FlagValueChange events. The function signature should be
# (key, context) -> value.
#
# This method must be called after initialization to enable the flag tracker
# to compute value changes for flag change listeners.
# Sets the diagnostic accumulator for streaming initialization metrics.
# This should be called before start() to ensure metrics are collected.
#
# @param eval_fn [Proc] The evaluation function
# @param diagnostic_accumulator [DiagnosticAccumulator] The diagnostic accumulator
# @return [void]
#
def set_flag_value_eval_fn(eval_fn)
raise NotImplementedError, "#{self.class} must implement #set_flag_value_eval_fn"
def set_diagnostic_accumulator(diagnostic_accumulator)
raise NotImplementedError, "#{self.class} must implement #set_diagnostic_accumulator"
end

#
Expand Down
165 changes: 165 additions & 0 deletions lib/ldclient-rb/impl/data_system/fdv1.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
require 'concurrent'
require 'ldclient-rb/impl/broadcaster'
require 'ldclient-rb/impl/data_source'
require 'ldclient-rb/impl/data_source/null_processor'
require 'ldclient-rb/impl/data_store'
require 'ldclient-rb/impl/data_system'
require 'ldclient-rb/impl/store_client_wrapper'

module LaunchDarkly
module Impl
module DataSystem
#
# FDv1 wires the existing v1 data source and store behavior behind the
# generic DataSystem surface.
#
# @see DataSystem
#
class FDv1
include LaunchDarkly::Impl::DataSystem

#
# Creates a new FDv1 data system.
#
# @param sdk_key [String] The SDK key
# @param config [LaunchDarkly::Config] The SDK configuration
#
def initialize(sdk_key, config)
@sdk_key = sdk_key
@config = config
@shared_executor = Concurrent::SingleThreadExecutor.new

# Set up data store plumbing
@data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@data_store_update_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(
@data_store_broadcaster
)

# Wrap the data store with client wrapper (must be created before status provider)
@store_wrapper = LaunchDarkly::Impl::FeatureStoreClientWrapper.new(
@config.feature_store,
@data_store_update_sink,
@config.logger
)

# Create status provider with store wrapper
@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(
@store_wrapper,
@data_store_update_sink
)

# Set up data source plumbing
@data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@flag_change_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(
@store_wrapper,
@data_source_broadcaster,
@flag_change_broadcaster
)
@data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(
@data_source_broadcaster,
@data_source_update_sink
)

# Ensure v1 processors can find the sink via config for status updates
@config.data_source_update_sink = @data_source_update_sink

# Update processor created in start()
@update_processor = nil

# Diagnostic accumulator provided by client for streaming metrics
@diagnostic_accumulator = nil
end

# (see DataSystem#start)
def start
@update_processor ||= make_update_processor
@update_processor.start
end

# (see DataSystem#stop)
def stop
@update_processor&.stop
@shared_executor.shutdown
end

# (see DataSystem#store)
def store
@store_wrapper
end

# (see DataSystem#set_diagnostic_accumulator)
def set_diagnostic_accumulator(diagnostic_accumulator)
@diagnostic_accumulator = diagnostic_accumulator
end

# (see DataSystem#data_source_status_provider)
def data_source_status_provider
@data_source_status_provider
end

# (see DataSystem#data_store_status_provider)
def data_store_status_provider
@data_store_status_provider
end

# (see DataSystem#flag_change_broadcaster)
def flag_change_broadcaster
@flag_change_broadcaster
end

#
# (see DataSystem#data_availability)
#
# In LDD mode, always returns CACHED for backwards compatibility,
# even if the store is empty.
#
def data_availability
return DataAvailability::DEFAULTS if @config.offline?
return DataAvailability::REFRESHED if @update_processor && @update_processor.initialized?
return DataAvailability::CACHED if @store_wrapper.initialized?

DataAvailability::DEFAULTS
end

# (see DataSystem#target_availability)
def target_availability
return DataAvailability::DEFAULTS if @config.offline?

DataAvailability::REFRESHED
end

#
# Creates the appropriate update processor based on the configuration.
#
# @return [Object] The update processor
#
private def make_update_processor
# Handle custom data source (factory or instance)
if @config.data_source
return @config.data_source unless @config.data_source.respond_to?(:call)

# Factory - call with appropriate arity
return @config.data_source.arity == 3 ?
@config.data_source.call(@sdk_key, @config, @diagnostic_accumulator) :
@config.data_source.call(@sdk_key, @config)
end

# Create default data source based on config
return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new if @config.offline? || @config.use_ldd?

if @config.stream?
require 'ldclient-rb/stream'
return LaunchDarkly::StreamProcessor.new(@sdk_key, @config, @diagnostic_accumulator)
end

# Polling processor
require 'ldclient-rb/polling'
requestor = LaunchDarkly::Requestor.new(@sdk_key, @config)
LaunchDarkly::PollingProcessor.new(@config, requestor)
end
end
end
end
end

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ module LaunchDarkly
module Impl
module Integrations
module TestData
# @private
class TestDataSource
include LaunchDarkly::Interfaces::DataSource

Expand Down
24 changes: 3 additions & 21 deletions lib/ldclient-rb/ldclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require "ldclient-rb/impl/broadcaster"
require "ldclient-rb/impl/data_source"
require "ldclient-rb/impl/data_store"
require "ldclient-rb/impl/data_source/null_processor"
require "ldclient-rb/impl/diagnostic_events"
require "ldclient-rb/impl/evaluator"
require "ldclient-rb/impl/evaluation_with_hook_result"
Expand Down Expand Up @@ -132,7 +133,7 @@ def postfork(wait_for_sec = 5)

if @config.use_ldd?
@config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" }
@data_source = NullUpdateProcessor.new
@data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
return # requestor and update processor are not used in this mode
end

Expand Down Expand Up @@ -710,7 +711,7 @@ def close

def create_default_data_source(sdk_key, config, diagnostic_accumulator)
if config.offline?
return NullUpdateProcessor.new
return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
end
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key
if config.stream?
Expand Down Expand Up @@ -877,23 +878,4 @@ def evaluate_internal(key, context, default, with_reasons)
false
end
end

#
# Used internally when the client is offline.
# @private
#
class NullUpdateProcessor
def start
e = Concurrent::Event.new
e.set
e
end

def initialized?
true
end

def stop
end
end
end
Loading