Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7b44351
add concurrent_stream_drain flag (default false)
ihabadham Aug 26, 2025
df64df1
add and bundle async runtime dependency
ihabadham Aug 26, 2025
d93d520
concurrent fiber draining via Async with single writer; add tracing l…
ihabadham Aug 26, 2025
b6bb65d
make sequential draining robust to already finished fibers
ihabadham Aug 26, 2025
bcd30c4
add default backpressure via Async::Semaphore and handle client disco…
ihabadham Aug 26, 2025
2941c29
add controller streaming specs for sequential vs concurrent, ordering…
ihabadham Aug 26, 2025
220b6d5
add a test for backpressure
ihabadham Aug 26, 2025
ada220d
refactor to correct rubocop offenses
ihabadham Aug 26, 2025
1a33a69
fix NoMethodError caused by Array.bytesize
ihabadham Aug 26, 2025
8cdd18e
add concurrent_stream_queue_capacity (default 64) and use it in strea…
ihabadham Aug 29, 2025
49782a2
add a comment explaining why semaphore.acquire is preferable to semap…
ihabadham Aug 29, 2025
6ad4755
refactor(stream): use Async::Queue#close as a final single sentinel; …
ihabadham Aug 29, 2025
effb6df
refactor: propagate streaming errors instead of rescuing
ihabadham Aug 29, 2025
d4f9d37
ci: correct rubocop offenses
ihabadham Sep 1, 2025
3e02ac9
add a simpler test for the concurrent stream_view_containing_react_co…
AbanoubGhadban Sep 3, 2025
5b60bcf
refactor streaming tests to use pure mock approach
ihabadham Sep 7, 2025
dd7ba39
DRY the tests
ihabadham Sep 7, 2025
d198af5
remove the concurrent_stream_drain config flag and always stream comp…
ihabadham Sep 7, 2025
23990cc
remove debug logging
ihabadham Sep 7, 2025
86e4db6
correct rubocop offenses
ihabadham Sep 7, 2025
36ebc0e
use async queue instead of ruby array at helper spec
AbanoubGhadban Sep 8, 2025
2b68ade
Revert "use async queue instead of ruby array at helper spec"
AbanoubGhadban Sep 8, 2025
e3a4490
Enhance helper spec to support Async::Queue for chunk processing
AbanoubGhadban Sep 8, 2025
e661b6e
Revert "Enhance helper spec to support Async::Queue for chunk process…
AbanoubGhadban Sep 8, 2025
493c97d
Refactor helper spec to utilize Async::Queue for improved chunk proce…
AbanoubGhadban Sep 8, 2025
c1b7e4b
Refactor configuration and streaming logic to use concurrent_componen…
AbanoubGhadban Sep 8, 2025
a20d590
Refactor streaming logic to remove unnecessary error handling for imp…
AbanoubGhadban Sep 8, 2025
3d4232a
pass buffer_size to LimitedQueue as a positional argument because it …
ihabadham Sep 8, 2025
dd64322
ci: correct rubocop offenses
ihabadham Sep 8, 2025
acef006
ci: avoid getting a rubocop error
ihabadham Sep 8, 2025
933c351
update CHANGELOG.md
ihabadham Sep 8, 2025
11b6772
Update react_on_rails to 16.0.1.rc.4 to fix yanked version issue
ihabadham Sep 24, 2025
3ef1bec
remove accidently pushed Gemfile.local.backup
ihabadham Sep 24, 2025
7715f6c
git ignore .claude/
ihabadham Sep 24, 2025
2e3a5c7
Fix ReactOnRails::PackerUtils.using_packer? compatibility with react_…
ihabadham Sep 24, 2025
1836b3b
Remove redundant .claude/ entry from react_on_rails_pro/.gitignore
ihabadham Nov 13, 2025
bc3c7e6
Add missing validation call for concurrent_component_streaming_buffer…
ihabadham Nov 13, 2025
864cc6c
Validate buffer size as Integer instead of Numeric
ihabadham Nov 13, 2025
0815ce0
Improve concurrent streaming error handling and configuration
justin808 Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions react_on_rails_pro/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ You can find the **package** version numbers from this repo's tags and below in

_Add changes in master not yet tagged._

### Improved
- Significantly improved streaming performance by processing React components concurrently instead of sequentially. This reduces latency and improves responsiveness when using `stream_view_containing_react_components`.

### Added
- Added `config.concurrent_component_streaming_buffer_size` configuration option to control the memory buffer size for concurrent component streaming (defaults to 64). This allows fine-tuning of memory usage vs. performance for streaming applications.

### Added

- Added `cached_stream_react_component` helper method, similar to `cached_react_component` but for streamed components.
Expand Down Expand Up @@ -48,6 +54,7 @@ _Add changes in master not yet tagged._

- `config.prerender_caching`, which controls caching for non-streaming components, now also controls caching for streamed components. To disable caching for an individual render, pass `internal_option(:skip_prerender_cache)`.
- **Configuration Migration Required**: If you are using RSC features, you must move the RSC-related configurations from `ReactOnRails.configure` to `ReactOnRailsPro.configure` in your initializers. See the migration example in the [React on Rails CHANGELOG](https://github.com/shakacode/react_on_rails/blob/master/CHANGELOG.md#unreleased).
- Added `async` gem dependency (>= 2.6) to support concurrent streaming functionality.

## [4.0.0-rc.15] - 2025-08-11

Expand Down
18 changes: 18 additions & 0 deletions react_on_rails_pro/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ PATH
specs:
react_on_rails_pro (16.2.0.beta.4)
addressable
async (>= 2.6)
connection_pool
execjs (~> 2.9)
httpx (~> 1.5)
Expand Down Expand Up @@ -107,6 +108,12 @@ GEM
public_suffix (>= 2.0.2, < 7.0)
amazing_print (1.6.0)
ast (2.4.2)
async (2.27.4)
console (~> 1.29)
fiber-annotation
io-event (~> 1.11)
metrics (~> 0.12)
traces (~> 0.15)
base64 (0.2.0)
benchmark (0.4.0)
bigdecimal (3.1.9)
Expand Down Expand Up @@ -134,6 +141,10 @@ GEM
commonmarker (1.1.4-x86_64-linux)
concurrent-ruby (1.3.5)
connection_pool (2.5.0)
console (1.33.0)
fiber-annotation
fiber-local (~> 1.1)
json
coveralls (0.8.23)
json (>= 1.8, < 3)
simplecov (~> 0.16.1)
Expand All @@ -158,6 +169,10 @@ GEM
ffi (1.17.0-arm64-darwin)
ffi (1.17.0-x86_64-darwin)
ffi (1.17.0-x86_64-linux-gnu)
fiber-annotation (0.2.0)
fiber-local (1.1.0)
fiber-storage
fiber-storage (1.0.1)
gem-release (2.2.2)
generator_spec (0.10.0)
activesupport (>= 3.0.0)
Expand All @@ -173,6 +188,7 @@ GEM
i18n (1.14.7)
concurrent-ruby (~> 1.0)
io-console (0.8.0)
io-event (1.12.1)
irb (1.15.1)
pp (>= 0.6.0)
rdoc (>= 4.0.0)
Expand Down Expand Up @@ -205,6 +221,7 @@ GEM
marcel (1.0.4)
matrix (0.4.2)
method_source (1.1.0)
metrics (0.14.0)
mini_mime (1.1.5)
minitest (5.25.4)
mize (0.4.1)
Expand Down Expand Up @@ -411,6 +428,7 @@ GEM
tins (1.33.0)
bigdecimal
sync
traces (0.18.1)
turbolinks (5.2.1)
turbolinks-source (~> 5.2)
turbolinks-source (5.2.0)
Expand Down
87 changes: 83 additions & 4 deletions react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,91 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true
# So we strip extra newlines from the template string and add a single newline
response.stream.write(template_string)

@rorp_rendering_fibers.each do |fiber|
while (chunk = fiber.resume)
response.stream.write(chunk)
begin
drain_streams_concurrently
ensure
response.stream.close if close_stream_at_end
end
end

private

# Drains all streaming fibers concurrently using a producer-consumer pattern.
#
# Producer tasks: Each fiber drains its stream and enqueues chunks to a shared queue.
# Consumer task: Single writer dequeues chunks and writes them to the response stream.
#
# Ordering guarantees:
# - Chunks from the same component maintain their order
# - Chunks from different components may interleave based on production timing
# - The first component to produce a chunk will have it written first
#
# Memory management:
# - Uses a limited queue (configured via concurrent_component_streaming_buffer_size)
# - Producers block when the queue is full, providing backpressure
# - This prevents unbounded memory growth from fast producers
def drain_streams_concurrently
require "async"
require "async/limited_queue"

return if @rorp_rendering_fibers.empty?

Sync do |parent|
# To avoid memory bloat, we use a limited queue to buffer chunks in memory.
buffer_size = ReactOnRailsPro.configuration.concurrent_component_streaming_buffer_size
queue = Async::LimitedQueue.new(buffer_size)

# Consumer task: Single writer dequeues and writes to response stream
writer = build_writer_task(parent: parent, queue: queue)
# Producer tasks: Each fiber drains its stream and enqueues chunks
tasks = build_producer_tasks(parent: parent, queue: queue)

# This structure ensures that even if a producer task fails, we always
# signal the writer to stop and then wait for it to finish draining
# any remaining items from the queue before propagating the error.
begin
tasks.each(&:wait)
ensure
# `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit.
queue.close
writer.wait
end
end
end

def build_producer_tasks(parent:, queue:)
@rorp_rendering_fibers.each_with_index.map do |fiber, idx|
parent.async do
loop do
# Check if client disconnected before expensive operations
break if response.stream.closed?

chunk = fiber.resume
break unless chunk

# Will be blocked if the queue is full until a chunk is dequeued
queue.enqueue([idx, chunk])
end
rescue IOError, Errno::EPIPE
# Client disconnected - stop producing
break
end
end
end

def build_writer_task(parent:, queue:)
parent.async do
loop do
pair = queue.dequeue
break if pair.nil?

_idx_from_queue, item = pair
response.stream.write(item)
end
rescue IOError, Errno::EPIPE
# Client disconnected - stop writing
nil
end
response.stream.close if close_stream_at_end
end
end
end
42 changes: 40 additions & 2 deletions react_on_rails_pro/lib/react_on_rails_pro/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def self.configuration
rsc_payload_generation_url_path: Configuration::DEFAULT_RSC_PAYLOAD_GENERATION_URL_PATH,
rsc_bundle_js_file: Configuration::DEFAULT_RSC_BUNDLE_JS_FILE,
react_client_manifest_file: Configuration::DEFAULT_REACT_CLIENT_MANIFEST_FILE,
react_server_client_manifest_file: Configuration::DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE
react_server_client_manifest_file: Configuration::DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE,
concurrent_component_streaming_buffer_size: Configuration::DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE
)
end

Expand All @@ -59,6 +60,7 @@ class Configuration # rubocop:disable Metrics/ClassLength
DEFAULT_RSC_BUNDLE_JS_FILE = "rsc-bundle.js"
DEFAULT_REACT_CLIENT_MANIFEST_FILE = "react-client-manifest.json"
DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE = "react-server-client-manifest.json"
DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE = 64

attr_accessor :renderer_url, :renderer_password, :tracing,
:server_renderer, :renderer_use_fallback_exec_js, :prerender_caching,
Expand All @@ -70,6 +72,30 @@ class Configuration # rubocop:disable Metrics/ClassLength
:rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file,
:react_server_client_manifest_file

attr_reader :concurrent_component_streaming_buffer_size

# Sets the buffer size for concurrent component streaming.
#
# This value controls how many chunks can be buffered in memory during
# concurrent streaming operations. When producers generate chunks faster
# than they can be written to the client, this buffer prevents unbounded
# memory growth by blocking producers when the buffer is full.
#
# Memory implications:
# - Buffer size of 64 (default) with 1KB chunks = ~64KB max memory
# - Buffer size of 64 with 1MB chunks = ~64MB max memory
# - Consider your typical chunk size when configuring this value
#
# @param value [Integer] A positive integer specifying the buffer size
# @raise [ReactOnRailsPro::Error] if value is not a positive integer
def concurrent_component_streaming_buffer_size=(value)
unless value.is_a?(Integer) && value.positive?
raise ReactOnRailsPro::Error,
"config.concurrent_component_streaming_buffer_size must be a positive integer"
end
@concurrent_component_streaming_buffer_size = value
end

def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize
renderer_use_fallback_exec_js: nil, prerender_caching: nil,
renderer_http_pool_size: nil, renderer_http_pool_timeout: nil,
Expand All @@ -79,7 +105,9 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil,
renderer_request_retry_limit: nil, throw_js_errors: nil, ssr_timeout: nil,
profile_server_rendering_js_code: nil, raise_non_shell_server_rendering_errors: nil,
enable_rsc_support: nil, rsc_payload_generation_url_path: nil,
rsc_bundle_js_file: nil, react_client_manifest_file: nil, react_server_client_manifest_file: nil)
rsc_bundle_js_file: nil, react_client_manifest_file: nil,
react_server_client_manifest_file: nil,
concurrent_component_streaming_buffer_size: DEFAULT_CONCURRENT_COMPONENT_STREAMING_BUFFER_SIZE)
self.renderer_url = renderer_url
self.renderer_password = renderer_password
self.server_renderer = server_renderer
Expand All @@ -105,6 +133,7 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil,
self.rsc_bundle_js_file = rsc_bundle_js_file
self.react_client_manifest_file = react_client_manifest_file
self.react_server_client_manifest_file = react_server_client_manifest_file
self.concurrent_component_streaming_buffer_size = concurrent_component_streaming_buffer_size
end

def setup_config_values
Expand All @@ -113,6 +142,7 @@ def setup_config_values
validate_remote_bundle_cache_adapter
setup_renderer_password
setup_assets_to_copy
validate_concurrent_component_streaming_buffer_size
setup_execjs_profiler_if_needed
check_react_on_rails_support_for_rsc
end
Expand Down Expand Up @@ -204,6 +234,14 @@ def validate_remote_bundle_cache_adapter
end
end

def validate_concurrent_component_streaming_buffer_size
return if concurrent_component_streaming_buffer_size.is_a?(Integer) &&
concurrent_component_streaming_buffer_size.positive?

raise ReactOnRailsPro::Error,
"config.concurrent_component_streaming_buffer_size must be a positive integer"
end

def setup_renderer_password
return if renderer_password.present?

Expand Down
6 changes: 3 additions & 3 deletions react_on_rails_pro/lib/react_on_rails_pro/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def self.rsc_bundle_hash
@rsc_bundle_hash = calc_bundle_hash(server_rsc_bundle_js_file_path)
end

# Returns the hashed file name when using webpacker. Useful for creating cache keys.
# Returns the hashed file name when using Shakapacker. Useful for creating cache keys.
def self.bundle_file_name(bundle_name)
# bundle_js_uri_from_packer can return a file path or a HTTP URL (for files served from the dev server)
# Pathname can handle both cases
Expand All @@ -117,8 +117,8 @@ def self.bundle_file_name(bundle_name)
pathname.basename.to_s
end

# Returns the hashed file name of the server bundle when using webpacker.
# Necessary fragment-caching keys.
# Returns the hashed file name of the server bundle when using Shakapacker.
# Necessary for fragment-caching keys.
def self.server_bundle_file_name
return @server_bundle_hash if @server_bundle_hash && !Rails.env.development?

Expand Down
1 change: 1 addition & 0 deletions react_on_rails_pro/react_on_rails_pro.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "execjs", "~> 2.9"
s.add_runtime_dependency "httpx", "~> 1.5"
s.add_runtime_dependency "jwt", "~> 2.7"
s.add_runtime_dependency "async", ">= 2.6"
s.add_runtime_dependency "rainbow"
s.add_runtime_dependency "react_on_rails", ReactOnRails::VERSION
s.add_development_dependency "bundler"
Expand Down
Loading
Loading