-
Notifications
You must be signed in to change notification settings - Fork 54
chore: Create FDv1 datasystem implementation #339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
66b8876
2763e07
473de38
8784a7e
2eca9c9
e89b7fb
8e84bd1
421e857
67a6c0f
45e8c70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| require 'concurrent' | ||
| require 'ldclient-rb/interfaces' | ||
|
|
||
| module LaunchDarkly | ||
| module Impl | ||
| module DataSource | ||
| # | ||
| # A minimal UpdateProcessor implementation used when the SDK is in offline mode | ||
| # or daemon (LDD) mode. It does nothing except mark itself as initialized. | ||
| # | ||
| class NullUpdateProcessor | ||
| include LaunchDarkly::Interfaces::DataSource | ||
|
|
||
| # | ||
| # Creates a new NullUpdateProcessor. | ||
| # | ||
| def initialize | ||
| @ready = Concurrent::Event.new | ||
| end | ||
|
|
||
| # | ||
| # Starts the data source. Since this is a null implementation, it immediately | ||
| # sets the ready event to indicate initialization is complete. | ||
| # | ||
| # @return [Concurrent::Event] The ready event | ||
| # | ||
| def start | ||
| @ready.set | ||
| @ready | ||
| end | ||
|
|
||
| # | ||
| # Stops the data source. This is a no-op for the null implementation. | ||
| # | ||
| # @return [void] | ||
| # | ||
| def stop | ||
| # Nothing to do | ||
| end | ||
|
|
||
| # | ||
| # Checks if the data source has been initialized. | ||
| # | ||
| # @return [Boolean] Always returns true since this is a null implementation | ||
| # | ||
| def initialized? | ||
| true | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,19 +19,20 @@ module DataSystem | |
| # | ||
| # Starts the data system. | ||
| # | ||
| # This method will return immediately. The provided event will be set when the system | ||
| # This method will return immediately. The returned event will be set when the system | ||
| # has reached an initial state (either permanently failed, e.g. due to bad auth, or succeeded). | ||
| # | ||
| # @param ready_event [Concurrent::Event] Event to set when initialization is complete | ||
| # @return [void] | ||
| # If called multiple times, returns the same event as the first call. | ||
| # | ||
| # @return [Concurrent::Event] Event that will be set when initialization is complete | ||
| # | ||
| def start(ready_event) | ||
| def start | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was a little torn on having the ready_event passed in vs returning it. The existing data_source public interface for its start method returns a Concurrent::Event. This means we would either need an interface layer to sync the two events between the data system and the data source, or we would need a breaking change to have the data source interface accept the event vs return it. |
||
| raise NotImplementedError, "#{self.class} must implement #start" | ||
| end | ||
|
|
||
| # | ||
| # Halts the data system. Should be called when the client is closed to stop any long running | ||
| # operations. | ||
| # operations. Makes the data system no longer usable. | ||
| # | ||
| # @return [void] | ||
| # | ||
|
|
@@ -67,18 +68,23 @@ def data_store_status_provider | |
| end | ||
|
|
||
| # | ||
| # Returns an interface for tracking changes in feature flag configurations. | ||
| # Returns the broadcaster for flag change notifications. | ||
| # | ||
| # Consumers can use this broadcaster to build their own flag tracker | ||
| # or listen for flag changes directly. | ||
| # | ||
| # @return [LaunchDarkly::Interfaces::FlagTracker] | ||
| # @return [LaunchDarkly::Impl::Broadcaster] | ||
| # | ||
| def flag_tracker | ||
| raise NotImplementedError, "#{self.class} must implement #flag_tracker" | ||
| def flag_change_broadcaster | ||
| raise NotImplementedError, "#{self.class} must implement #flag_change_broadcaster" | ||
| end | ||
|
|
||
| # | ||
| # Indicates what form of data is currently available. | ||
| # | ||
| # @return [Symbol] One of DataAvailability constants | ||
| # This is calculated dynamically based on current system state. | ||
| # | ||
| # @return [Symbol] one of the {DataAvailability} constants | ||
| # | ||
| def data_availability | ||
| raise NotImplementedError, "#{self.class} must implement #data_availability" | ||
|
|
@@ -87,7 +93,7 @@ def data_availability | |
| # | ||
| # Indicates the ideal form of data attainable given the current configuration. | ||
| # | ||
| # @return [Symbol] One of DataAvailability constants | ||
| # @return [Symbol] one of the {#DataAvailability} constants | ||
| # | ||
| def target_availability | ||
| raise NotImplementedError, "#{self.class} must implement #target_availability" | ||
|
|
@@ -103,18 +109,14 @@ def store | |
| end | ||
|
|
||
| # | ||
| # Injects the flag value evaluation function used by the flag tracker to | ||
| # compute FlagValueChange events. The function signature should be | ||
| # (key, context) -> value. | ||
| # | ||
| # This method must be called after initialization to enable the flag tracker | ||
| # to compute value changes for flag change listeners. | ||
| # Sets the diagnostic accumulator for streaming initialization metrics. | ||
| # This should be called before start() to ensure metrics are collected. | ||
| # | ||
| # @param eval_fn [Proc] The evaluation function | ||
| # @param diagnostic_accumulator [DiagnosticAccumulator] The diagnostic accumulator | ||
| # @return [void] | ||
| # | ||
| def set_flag_value_eval_fn(eval_fn) | ||
| raise NotImplementedError, "#{self.class} must implement #set_flag_value_eval_fn" | ||
| def set_diagnostic_accumulator(diagnostic_accumulator) | ||
| raise NotImplementedError, "#{self.class} must implement #set_diagnostic_accumulator" | ||
| end | ||
|
|
||
| # | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,165 @@ | ||
| require 'concurrent' | ||
| require 'ldclient-rb/impl/broadcaster' | ||
| require 'ldclient-rb/impl/data_source' | ||
| require 'ldclient-rb/impl/data_source/null_processor' | ||
| require 'ldclient-rb/impl/data_store' | ||
| require 'ldclient-rb/impl/data_system' | ||
| require 'ldclient-rb/impl/store_client_wrapper' | ||
|
|
||
| module LaunchDarkly | ||
| module Impl | ||
| module DataSystem | ||
| # | ||
| # FDv1 wires the existing v1 data source and store behavior behind the | ||
| # generic DataSystem surface. | ||
| # | ||
| # @see DataSystem | ||
| # | ||
| class FDv1 | ||
| include LaunchDarkly::Impl::DataSystem | ||
|
|
||
| # | ||
| # Creates a new FDv1 data system. | ||
| # | ||
| # @param sdk_key [String] The SDK key | ||
| # @param config [LaunchDarkly::Config] The SDK configuration | ||
| # | ||
| def initialize(sdk_key, config) | ||
| @sdk_key = sdk_key | ||
| @config = config | ||
| @shared_executor = Concurrent::SingleThreadExecutor.new | ||
|
|
||
| # Set up data store plumbing | ||
| @data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) | ||
| @data_store_update_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new( | ||
| @data_store_broadcaster | ||
| ) | ||
|
|
||
| # Wrap the data store with client wrapper (must be created before status provider) | ||
| @store_wrapper = LaunchDarkly::Impl::FeatureStoreClientWrapper.new( | ||
| @config.feature_store, | ||
| @data_store_update_sink, | ||
| @config.logger | ||
| ) | ||
|
|
||
| # Create status provider with store wrapper | ||
| @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new( | ||
| @store_wrapper, | ||
| @data_store_update_sink | ||
| ) | ||
|
|
||
| # Set up data source plumbing | ||
| @data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) | ||
| @flag_change_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) | ||
| @data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new( | ||
| @store_wrapper, | ||
| @data_source_broadcaster, | ||
| @flag_change_broadcaster | ||
| ) | ||
| @data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new( | ||
| @data_source_broadcaster, | ||
| @data_source_update_sink | ||
| ) | ||
|
|
||
| # Ensure v1 processors can find the sink via config for status updates | ||
| @config.data_source_update_sink = @data_source_update_sink | ||
|
|
||
| # Update processor created in start() | ||
| @update_processor = nil | ||
|
|
||
| # Diagnostic accumulator provided by client for streaming metrics | ||
| @diagnostic_accumulator = nil | ||
| end | ||
|
|
||
| # (see DataSystem#start) | ||
| def start | ||
| @update_processor ||= make_update_processor | ||
| @update_processor.start | ||
| end | ||
jsonbailey marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # (see DataSystem#stop) | ||
| def stop | ||
| @update_processor&.stop | ||
| @shared_executor.shutdown | ||
| end | ||
jsonbailey marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # (see DataSystem#store) | ||
| def store | ||
| @store_wrapper | ||
| end | ||
|
|
||
| # (see DataSystem#set_diagnostic_accumulator) | ||
| def set_diagnostic_accumulator(diagnostic_accumulator) | ||
| @diagnostic_accumulator = diagnostic_accumulator | ||
| end | ||
|
|
||
| # (see DataSystem#data_source_status_provider) | ||
| def data_source_status_provider | ||
| @data_source_status_provider | ||
| end | ||
|
|
||
| # (see DataSystem#data_store_status_provider) | ||
| def data_store_status_provider | ||
| @data_store_status_provider | ||
| end | ||
|
|
||
| # (see DataSystem#flag_change_broadcaster) | ||
| def flag_change_broadcaster | ||
| @flag_change_broadcaster | ||
| end | ||
|
|
||
| # | ||
| # (see DataSystem#data_availability) | ||
| # | ||
| # In LDD mode, always returns CACHED for backwards compatibility, | ||
| # even if the store is empty. | ||
| # | ||
| def data_availability | ||
| return DataAvailability::DEFAULTS if @config.offline? | ||
| return DataAvailability::REFRESHED if @update_processor && @update_processor.initialized? | ||
| return DataAvailability::CACHED if @store_wrapper.initialized? | ||
|
|
||
| DataAvailability::DEFAULTS | ||
| end | ||
|
|
||
| # (see DataSystem#target_availability) | ||
| def target_availability | ||
| return DataAvailability::DEFAULTS if @config.offline? | ||
|
|
||
| DataAvailability::REFRESHED | ||
| end | ||
|
|
||
| # | ||
| # Creates the appropriate update processor based on the configuration. | ||
| # | ||
| # @return [Object] The update processor | ||
| # | ||
| private def make_update_processor | ||
| # Handle custom data source (factory or instance) | ||
| if @config.data_source | ||
| return @config.data_source unless @config.data_source.respond_to?(:call) | ||
|
|
||
| # Factory - call with appropriate arity | ||
| return @config.data_source.arity == 3 ? | ||
| @config.data_source.call(@sdk_key, @config, @diagnostic_accumulator) : | ||
| @config.data_source.call(@sdk_key, @config) | ||
| end | ||
|
|
||
| # Create default data source based on config | ||
| return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new if @config.offline? || @config.use_ldd? | ||
|
|
||
| if @config.stream? | ||
| require 'ldclient-rb/stream' | ||
| return LaunchDarkly::StreamProcessor.new(@sdk_key, @config, @diagnostic_accumulator) | ||
| end | ||
|
|
||
| # Polling processor | ||
| require 'ldclient-rb/polling' | ||
| requestor = LaunchDarkly::Requestor.new(@sdk_key, @config) | ||
| LaunchDarkly::PollingProcessor.new(@config, requestor) | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.