diff --git a/react_on_rails_pro/CHANGELOG.md b/react_on_rails_pro/CHANGELOG.md index 9e4d46c731..80883c3170 100644 --- a/react_on_rails_pro/CHANGELOG.md +++ b/react_on_rails_pro/CHANGELOG.md @@ -19,6 +19,12 @@ You can find the **package** version numbers from this repo's tags and below in _Add changes in master not yet tagged._ +### Improved +- Significantly improved streaming performance by processing React components concurrently instead of sequentially. This reduces latency and improves responsiveness when using `stream_view_containing_react_components`. + +### Added +- Added `config.concurrent_component_streaming_buffer_size` configuration option to control the memory buffer size for concurrent component streaming (defaults to 64). This allows fine-tuning of memory usage vs. performance for streaming applications. + ### Added - Added `cached_stream_react_component` helper method, similar to `cached_react_component` but for streamed components. @@ -48,6 +54,7 @@ _Add changes in master not yet tagged._ - `config.prerender_caching`, which controls caching for non-streaming components, now also controls caching for streamed components. To disable caching for an individual render, pass `internal_option(:skip_prerender_cache)`. - **Configuration Migration Required**: If you are using RSC features, you must move the RSC-related configurations from `ReactOnRails.configure` to `ReactOnRailsPro.configure` in your initializers. See the migration example in the [React on Rails CHANGELOG](https://github.com/shakacode/react_on_rails/blob/master/CHANGELOG.md#unreleased). +- Added `async` gem dependency (>= 2.6) to support concurrent streaming functionality. ## [4.0.0-rc.15] - 2025-08-11 diff --git a/react_on_rails_pro/Gemfile.lock b/react_on_rails_pro/Gemfile.lock index 64b44f4d65..d72ae72051 100644 --- a/react_on_rails_pro/Gemfile.lock +++ b/react_on_rails_pro/Gemfile.lock @@ -22,6 +22,7 @@ PATH specs: react_on_rails_pro (16.2.0.beta.4) addressable + async (>= 2.6) connection_pool execjs (~> 2.9) httpx (~> 1.5) @@ -107,6 +108,12 @@ GEM public_suffix (>= 2.0.2, < 7.0) amazing_print (1.6.0) ast (2.4.2) + async (2.27.4) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.15) base64 (0.2.0) benchmark (0.4.0) bigdecimal (3.1.9) @@ -134,6 +141,10 @@ GEM commonmarker (1.1.4-x86_64-linux) concurrent-ruby (1.3.5) connection_pool (2.5.0) + console (1.33.0) + fiber-annotation + fiber-local (~> 1.1) + json coveralls (0.8.23) json (>= 1.8, < 3) simplecov (~> 0.16.1) @@ -158,6 +169,10 @@ GEM ffi (1.17.0-arm64-darwin) ffi (1.17.0-x86_64-darwin) ffi (1.17.0-x86_64-linux-gnu) + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.1) gem-release (2.2.2) generator_spec (0.10.0) activesupport (>= 3.0.0) @@ -173,6 +188,7 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-event (1.12.1) irb (1.15.1) pp (>= 0.6.0) rdoc (>= 4.0.0) @@ -205,6 +221,7 @@ GEM marcel (1.0.4) matrix (0.4.2) method_source (1.1.0) + metrics (0.14.0) mini_mime (1.1.5) minitest (5.25.4) mize (0.4.1) @@ -411,6 +428,7 @@ GEM tins (1.33.0) bigdecimal sync + traces (0.18.1) turbolinks (5.2.1) turbolinks-source (~> 5.2) turbolinks-source (5.2.0) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index d1cecb14cc..63ef397f82 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -38,12 +38,90 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true # So we strip extra newlines from the template string and add a single newline response.stream.write(template_string) - @rorp_rendering_fibers.each do |fiber| - while (chunk = fiber.resume) + begin + drain_streams_concurrently + ensure + response.stream.close if close_stream_at_end + end + end + + private + + # Drains all streaming fibers concurrently using a producer-consumer pattern. + # + # Producer tasks: Each fiber drains its stream and enqueues chunks to a shared queue. + # Consumer task: Single writer dequeues chunks and writes them to the response stream. + # + # Ordering guarantees: + # - Chunks from the same component maintain their order + # - Chunks from different components may interleave based on production timing + # - The first component to produce a chunk will have it written first + # + # Memory management: + # - Uses a limited queue (configured via concurrent_component_streaming_buffer_size) + # - Producers block when the queue is full, providing backpressure + # - This prevents unbounded memory growth from fast producers + def drain_streams_concurrently + require "async" + require "async/limited_queue" + + return if @rorp_rendering_fibers.empty? + + Sync do |parent| + # To avoid memory bloat, we use a limited queue to buffer chunks in memory. + buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size + queue = Async::LimitedQueue.new(buffer_size) + + # Consumer task: Single writer dequeues and writes to response stream + writer = build_writer_task(parent: parent, queue: queue) + # Producer tasks: Each fiber drains its stream and enqueues chunks + tasks = build_producer_tasks(parent: parent, queue: queue) + + # This structure ensures that even if a producer task fails, we always + # signal the writer to stop and then wait for it to finish draining + # any remaining items from the queue before propagating the error. + begin + tasks.each(&:wait) + ensure + # `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit. + queue.close + writer.wait + end + end + end + + def build_producer_tasks(parent:, queue:) + @rorp_rendering_fibers.map do |fiber| + parent.async do + loop do + # Check if client disconnected before expensive operations + break if response.stream.closed? + + chunk = fiber.resume + break unless chunk + + # Will be blocked if the queue is full until a chunk is dequeued + queue.enqueue(chunk) + end + rescue IOError, Errno::EPIPE + # Client disconnected - stop producing + break + end + end + end + + def build_writer_task(parent:, queue:) + parent.async do + loop do + chunk = queue.dequeue + break if chunk.nil? + response.stream.write(chunk) end + rescue IOError, Errno::EPIPE + # Client disconnected - stop writing + nil end - response.stream.close if close_stream_at_end end end end diff --git a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb index 12d913a457..def61e271f 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb @@ -32,7 +32,8 @@ def self.configuration rsc_payload_generation_url_path: Configuration::DEFAULT_RSC_PAYLOAD_GENERATION_URL_PATH, rsc_bundle_js_file: Configuration::DEFAULT_RSC_BUNDLE_JS_FILE, react_client_manifest_file: Configuration::DEFAULT_REACT_CLIENT_MANIFEST_FILE, - react_server_client_manifest_file: Configuration::DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE + react_server_client_manifest_file: Configuration::DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE, + concurrent_component_streaming_buffer_size: Configuration::DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE ) end @@ -59,6 +60,7 @@ class Configuration # rubocop:disable Metrics/ClassLength DEFAULT_RSC_BUNDLE_JS_FILE = "rsc-bundle.js" DEFAULT_REACT_CLIENT_MANIFEST_FILE = "react-client-manifest.json" DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE = "react-server-client-manifest.json" + DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE = 64 attr_accessor :renderer_url, :renderer_password, :tracing, :server_renderer, :renderer_use_fallback_exec_js, :prerender_caching, @@ -70,6 +72,30 @@ class Configuration # rubocop:disable Metrics/ClassLength :rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file, :react_server_client_manifest_file + attr_reader :concurrent_component_streaming_buffer_size + + # Sets the buffer size for concurrent component streaming. + # + # This value controls how many chunks can be buffered in memory during + # concurrent streaming operations. When producers generate chunks faster + # than they can be written to the client, this buffer prevents unbounded + # memory growth by blocking producers when the buffer is full. + # + # Memory implications: + # - Buffer size of 64 (default) with 1KB chunks = ~64KB max memory + # - Buffer size of 64 with 1MB chunks = ~64MB max memory + # - Consider your typical chunk size when configuring this value + # + # @param value [Integer] A positive integer specifying the buffer size + # @raise [ReactOnRailsPro::Error] if value is not a positive integer + def concurrent_component_streaming_buffer_size=(value) + unless value.is_a?(Integer) && value.positive? + raise ReactOnRailsPro::Error, + "config.concurrent_component_streaming_buffer_size must be a positive integer" + end + @concurrent_component_streaming_buffer_size = value + end + def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize renderer_use_fallback_exec_js: nil, prerender_caching: nil, renderer_http_pool_size: nil, renderer_http_pool_timeout: nil, @@ -79,7 +105,9 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, renderer_request_retry_limit: nil, throw_js_errors: nil, ssr_timeout: nil, profile_server_rendering_js_code: nil, raise_non_shell_server_rendering_errors: nil, enable_rsc_support: nil, rsc_payload_generation_url_path: nil, - rsc_bundle_js_file: nil, react_client_manifest_file: nil, react_server_client_manifest_file: nil) + rsc_bundle_js_file: nil, react_client_manifest_file: nil, + react_server_client_manifest_file: nil, + concurrent_component_streaming_buffer_size: DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE) self.renderer_url = renderer_url self.renderer_password = renderer_password self.server_renderer = server_renderer @@ -105,6 +133,7 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, self.rsc_bundle_js_file = rsc_bundle_js_file self.react_client_manifest_file = react_client_manifest_file self.react_server_client_manifest_file = react_server_client_manifest_file + self.concurrent_component_streaming_buffer_size = concurrent_component_streaming_buffer_size end def setup_config_values diff --git a/react_on_rails_pro/lib/react_on_rails_pro/utils.rb b/react_on_rails_pro/lib/react_on_rails_pro/utils.rb index 952cf6cc1e..fdf93cb98e 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/utils.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/utils.rb @@ -108,7 +108,7 @@ def self.rsc_bundle_hash @rsc_bundle_hash = calc_bundle_hash(server_rsc_bundle_js_file_path) end - # Returns the hashed file name when using webpacker. Useful for creating cache keys. + # Returns the hashed file name when using Shakapacker. Useful for creating cache keys. def self.bundle_file_name(bundle_name) # bundle_js_uri_from_packer can return a file path or a HTTP URL (for files served from the dev server) # Pathname can handle both cases @@ -117,8 +117,8 @@ def self.bundle_file_name(bundle_name) pathname.basename.to_s end - # Returns the hashed file name of the server bundle when using webpacker. - # Necessary fragment-caching keys. + # Returns the hashed file name of the server bundle when using Shakapacker. + # Necessary for fragment-caching keys. def self.server_bundle_file_name return @server_bundle_hash if @server_bundle_hash && !Rails.env.development? diff --git a/react_on_rails_pro/react_on_rails_pro.gemspec b/react_on_rails_pro/react_on_rails_pro.gemspec index 59b0b6aab6..7988a1cdcd 100644 --- a/react_on_rails_pro/react_on_rails_pro.gemspec +++ b/react_on_rails_pro/react_on_rails_pro.gemspec @@ -37,6 +37,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "execjs", "~> 2.9" s.add_runtime_dependency "httpx", "~> 1.5" s.add_runtime_dependency "jwt", "~> 2.7" + s.add_runtime_dependency "async", ">= 2.6" s.add_runtime_dependency "rainbow" s.add_runtime_dependency "react_on_rails", ReactOnRails::VERSION s.add_development_dependency "bundler" diff --git a/react_on_rails_pro/spec/dummy/Gemfile.lock b/react_on_rails_pro/spec/dummy/Gemfile.lock index 3d50301024..fec02008bb 100644 --- a/react_on_rails_pro/spec/dummy/Gemfile.lock +++ b/react_on_rails_pro/spec/dummy/Gemfile.lock @@ -22,6 +22,7 @@ PATH specs: react_on_rails_pro (16.2.0.beta.4) addressable + async (>= 2.6) connection_pool execjs (~> 2.9) httpx (~> 1.5) @@ -107,6 +108,12 @@ GEM public_suffix (>= 2.0.2, < 7.0) amazing_print (1.6.0) ast (2.4.2) + async (2.34.0) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.18) base64 (0.2.0) benchmark (0.4.0) bigdecimal (3.1.9) @@ -131,6 +138,10 @@ GEM coderay (1.1.3) concurrent-ruby (1.3.5) connection_pool (2.5.0) + console (1.34.2) + fiber-annotation + fiber-local (~> 1.1) + json coveralls (0.8.23) json (>= 1.8, < 3) simplecov (~> 0.16.1) @@ -165,6 +176,9 @@ GEM ffi (1.17.0-x86_64-darwin) ffi (1.17.0-x86_64-linux-gnu) ffi (1.17.0-x86_64-linux-musl) + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage fiber-storage (1.0.0) generator_spec (0.10.0) activesupport (>= 3.0.0) @@ -184,6 +198,7 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-event (1.14.0) irb (1.15.1) pp (>= 0.6.0) rdoc (>= 4.0.0) @@ -216,6 +231,7 @@ GEM marcel (1.0.4) matrix (0.4.2) method_source (1.1.0) + metrics (0.15.0) mini_mime (1.1.5) mini_portile2 (2.8.8) minitest (5.25.4) @@ -447,6 +463,7 @@ GEM tins (1.33.0) bigdecimal sync + traces (0.18.2) turbolinks (5.2.1) turbolinks-source (~> 5.2) turbolinks-source (5.2.0) diff --git a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb index 9cae3fb11b..01b03c2762 100644 --- a/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb +++ b/react_on_rails_pro/spec/dummy/spec/helpers/react_on_rails_pro_helper_spec.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "async" +require "async/queue" require "rails_helper" require "support/script_tag_utils" @@ -327,6 +329,7 @@ def response; end HTML end + # mock_chunks can be an Async::Queue or an Array def mock_request_and_response(mock_chunks = chunks, count: 1) # Reset connection instance variables to ensure clean state for tests ReactOnRailsPro::Request.instance_variable_set(:@connection, nil) @@ -339,9 +342,19 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) chunks_read.clear mock_streaming_response(%r{http://localhost:3800/bundles/[a-f0-9]{32}-test/render/[a-f0-9]{32}}, 200, count: count) do |yielder| - mock_chunks.each do |chunk| - chunks_read << chunk - yielder.call("#{chunk.to_json}\n") + if mock_chunks.is_a?(Async::Queue) + loop do + chunk = mock_chunks.dequeue + break if chunk.nil? + + chunks_read << chunk + yielder.call("#{chunk.to_json}\n") + end + else + mock_chunks.each do |chunk| + chunks_read << chunk + yielder.call("#{chunk.to_json}\n") + end end end end @@ -428,18 +441,36 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) allow(mocked_stream).to receive(:write) do |chunk| written_chunks << chunk - # Ensures that any chunk received is written immediately to the stream - expect(written_chunks.count).to eq(chunks_read.count) # rubocop:disable RSpec/ExpectInHook end allow(mocked_stream).to receive(:close) + allow(mocked_stream).to receive(:closed?).and_return(false) mocked_response = instance_double(ActionDispatch::Response) allow(mocked_response).to receive(:stream).and_return(mocked_stream) allow(self).to receive(:response).and_return(mocked_response) - mock_request_and_response + end + + def execute_stream_view_containing_react_components + queue = Async::Queue.new + mock_request_and_response(queue) + + Sync do |parent| + parent.async { stream_view_containing_react_components(template: template_path) } + + chunks_to_write = chunks.dup + while (chunk = chunks_to_write.shift) + queue.enqueue(chunk) + sleep 0.05 + + # Ensures that any chunk received is written immediately to the stream + expect(written_chunks.count).to eq(chunks_read.count) + end + queue.close + sleep 0.05 + end end it "writes the chunk to stream as soon as it is received" do - stream_view_containing_react_components(template: template_path) + execute_stream_view_containing_react_components expect(self).to have_received(:render_to_string).once.with(template: template_path) expect(chunks_read.count).to eq(chunks.count) expect(written_chunks.count).to eq(chunks.count) @@ -448,7 +479,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end it "prepends the rails context to the first chunk only" do - stream_view_containing_react_components(template: template_path) + execute_stream_view_containing_react_components initial_result = written_chunks.first expect(initial_result).to script_tag_be_included(rails_context_tag) @@ -464,7 +495,7 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end it "prepends the component specification tag to the first chunk only" do - stream_view_containing_react_components(template: template_path) + execute_stream_view_containing_react_components initial_result = written_chunks.first expect(initial_result).to script_tag_be_included(react_component_specification_tag) @@ -475,13 +506,14 @@ def mock_request_and_response(mock_chunks = chunks, count: 1) end it "renders the rails view content in the first chunk" do - stream_view_containing_react_components(template: template_path) + execute_stream_view_containing_react_components initial_result = written_chunks.first expect(initial_result).to include("