From 7b44351634c14b4fb7278b469e0f365b002d90f9 Mon Sep 17 00:00:00 2001 From: ihabadham Date: Tue, 26 Aug 2025 13:28:20 +0300 Subject: [PATCH 01/39] add concurrent_stream_drain flag (default false) --- .../lib/react_on_rails_pro/configuration.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb index 12d913a457..4ac54c1425 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb @@ -32,7 +32,8 @@ def self.configuration rsc_payload_generation_url_path: Configuration::DEFAULT_RSC_PAYLOAD_GENERATION_URL_PATH, rsc_bundle_js_file: Configuration::DEFAULT_RSC_BUNDLE_JS_FILE, react_client_manifest_file: Configuration::DEFAULT_REACT_CLIENT_MANIFEST_FILE, - react_server_client_manifest_file: Configuration::DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE + react_server_client_manifest_file: Configuration::DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE, + concurrent_stream_drain: Configuration::DEFAULT_CONCURRENT_STREAM_DRAIN ) end @@ -59,6 +60,7 @@ class Configuration # rubocop:disable Metrics/ClassLength DEFAULT_RSC_BUNDLE_JS_FILE = "rsc-bundle.js" DEFAULT_REACT_CLIENT_MANIFEST_FILE = "react-client-manifest.json" DEFAULT_REACT_SERVER_CLIENT_MANIFEST_FILE = "react-server-client-manifest.json" + DEFAULT_CONCURRENT_STREAM_DRAIN = false attr_accessor :renderer_url, :renderer_password, :tracing, :server_renderer, :renderer_use_fallback_exec_js, :prerender_caching, @@ -68,7 +70,7 @@ class Configuration # rubocop:disable Metrics/ClassLength :renderer_request_retry_limit, :throw_js_errors, :ssr_timeout, :profile_server_rendering_js_code, :raise_non_shell_server_rendering_errors, :enable_rsc_support, :rsc_payload_generation_url_path, :rsc_bundle_js_file, :react_client_manifest_file, - :react_server_client_manifest_file + :react_server_client_manifest_file, :concurrent_stream_drain def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, # rubocop:disable Metrics/AbcSize renderer_use_fallback_exec_js: nil, prerender_caching: nil, @@ -79,7 +81,8 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, renderer_request_retry_limit: nil, throw_js_errors: nil, ssr_timeout: nil, profile_server_rendering_js_code: nil, raise_non_shell_server_rendering_errors: nil, enable_rsc_support: nil, rsc_payload_generation_url_path: nil, - rsc_bundle_js_file: nil, react_client_manifest_file: nil, react_server_client_manifest_file: nil) + rsc_bundle_js_file: nil, react_client_manifest_file: nil, react_server_client_manifest_file: nil, + concurrent_stream_drain: DEFAULT_CONCURRENT_STREAM_DRAIN) self.renderer_url = renderer_url self.renderer_password = renderer_password self.server_renderer = server_renderer @@ -105,6 +108,7 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, self.rsc_bundle_js_file = rsc_bundle_js_file self.react_client_manifest_file = react_client_manifest_file self.react_server_client_manifest_file = react_server_client_manifest_file + self.concurrent_stream_drain = concurrent_stream_drain end def setup_config_values From df64df1229db0c6e88f37c70af89c49df843ec6f Mon Sep 17 00:00:00 2001 From: ihabadham Date: Tue, 26 Aug 2025 13:29:37 +0300 Subject: [PATCH 02/39] add and bundle async runtime dependency --- react_on_rails_pro/Gemfile.lock | 18 ++++++++++++++++++ react_on_rails_pro/react_on_rails_pro.gemspec | 1 + 2 files changed, 19 insertions(+) diff --git a/react_on_rails_pro/Gemfile.lock b/react_on_rails_pro/Gemfile.lock index 64b44f4d65..d72ae72051 100644 --- a/react_on_rails_pro/Gemfile.lock +++ b/react_on_rails_pro/Gemfile.lock @@ -22,6 +22,7 @@ PATH specs: react_on_rails_pro (16.2.0.beta.4) addressable + async (>= 2.6) connection_pool execjs (~> 2.9) httpx (~> 1.5) @@ -107,6 +108,12 @@ GEM public_suffix (>= 2.0.2, < 7.0) amazing_print (1.6.0) ast (2.4.2) + async (2.27.4) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.15) base64 (0.2.0) benchmark (0.4.0) bigdecimal (3.1.9) @@ -134,6 +141,10 @@ GEM commonmarker (1.1.4-x86_64-linux) concurrent-ruby (1.3.5) connection_pool (2.5.0) + console (1.33.0) + fiber-annotation + fiber-local (~> 1.1) + json coveralls (0.8.23) json (>= 1.8, < 3) simplecov (~> 0.16.1) @@ -158,6 +169,10 @@ GEM ffi (1.17.0-arm64-darwin) ffi (1.17.0-x86_64-darwin) ffi (1.17.0-x86_64-linux-gnu) + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.1) gem-release (2.2.2) generator_spec (0.10.0) activesupport (>= 3.0.0) @@ -173,6 +188,7 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-event (1.12.1) irb (1.15.1) pp (>= 0.6.0) rdoc (>= 4.0.0) @@ -205,6 +221,7 @@ GEM marcel (1.0.4) matrix (0.4.2) method_source (1.1.0) + metrics (0.14.0) mini_mime (1.1.5) minitest (5.25.4) mize (0.4.1) @@ -411,6 +428,7 @@ GEM tins (1.33.0) bigdecimal sync + traces (0.18.1) turbolinks (5.2.1) turbolinks-source (~> 5.2) turbolinks-source (5.2.0) diff --git a/react_on_rails_pro/react_on_rails_pro.gemspec b/react_on_rails_pro/react_on_rails_pro.gemspec index 59b0b6aab6..7988a1cdcd 100644 --- a/react_on_rails_pro/react_on_rails_pro.gemspec +++ b/react_on_rails_pro/react_on_rails_pro.gemspec @@ -37,6 +37,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "execjs", "~> 2.9" s.add_runtime_dependency "httpx", "~> 1.5" s.add_runtime_dependency "jwt", "~> 2.7" + s.add_runtime_dependency "async", ">= 2.6" s.add_runtime_dependency "rainbow" s.add_runtime_dependency "react_on_rails", ReactOnRails::VERSION s.add_development_dependency "bundler" From d93d520bafab0dba48fc2b8a0bff88c70bd4f274 Mon Sep 17 00:00:00 2001 From: ihabadham Date: Tue, 26 Aug 2025 13:30:37 +0300 Subject: [PATCH 03/39] concurrent fiber draining via Async with single writer; add tracing logs; behind config flag --- .../lib/react_on_rails_pro/concerns/stream.rb | 53 +++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index d1cecb14cc..514307ae81 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -38,12 +38,57 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true # So we strip extra newlines from the template string and add a single newline response.stream.write(template_string) - @rorp_rendering_fibers.each do |fiber| - while (chunk = fiber.resume) - response.stream.write(chunk) + if ReactOnRailsPro.configuration.concurrent_stream_drain + require "async" + require "async/queue" + + Sync do |parent| + queue = Async::Queue.new + remaining = @rorp_rendering_fibers.size + + unless remaining.zero? + tasks = [] + @rorp_rendering_fibers.each_with_index do |fiber, idx| + tasks << parent.async do + begin + while (chunk = fiber.resume) + queue.enqueue([idx, chunk]) + end + rescue StandardError => e + queue.enqueue([idx, ""]) # minimal signal + ensure + queue.enqueue([idx, :__done__]) + end + end + end + + writer = parent.async do + loop do + _idx, item = queue.dequeue + if item == :__done__ + remaining -= 1 + break if remaining.zero? + next + end + Rails.logger.info { "[ReactOnRailsPro] stream write (mode=concurrent) idx=#{_idx} bytes=#{item.bytesize}" } if ReactOnRailsPro.configuration.tracing + response.stream.write(item) + end + end + + tasks.each(&:wait) + writer.wait + end + end + response.stream.close if close_stream_at_end + else + @rorp_rendering_fibers.each_with_index do |fiber, idx| + while (chunk = fiber.resume) + Rails.logger.info { "[ReactOnRailsPro] stream write (mode=sequential) idx=#{idx} bytes=#{chunk.bytesize}" } if ReactOnRailsPro.configuration.tracing + response.stream.write(chunk) + end end + response.stream.close if close_stream_at_end end - response.stream.close if close_stream_at_end end end end From b6bb65d71302d19977b38c8841403a67b0e8aa7f Mon Sep 17 00:00:00 2001 From: ihabadham Date: Tue, 26 Aug 2025 14:26:01 +0300 Subject: [PATCH 04/39] make sequential draining robust to already finished fibers - handle when a component fiber is already drained after the first resume (the shell). - typically, streamed components will have more chunks besides the first chunk. - this is an edge case exposed while testing. - we're basically avoiding getting a FiberError: attempt to resume a terminated fiber --- .../lib/react_on_rails_pro/concerns/stream.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index 514307ae81..2b92f47650 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -82,7 +82,13 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true response.stream.close if close_stream_at_end else @rorp_rendering_fibers.each_with_index do |fiber, idx| - while (chunk = fiber.resume) + loop do + begin + chunk = fiber.resume + rescue FiberError + break + end + break unless chunk Rails.logger.info { "[ReactOnRailsPro] stream write (mode=sequential) idx=#{idx} bytes=#{chunk.bytesize}" } if ReactOnRailsPro.configuration.tracing response.stream.write(chunk) end From bcd30c41fcefb2e3b1871a668ae0989b4b41c4a5 Mon Sep 17 00:00:00 2001 From: ihabadham Date: Tue, 26 Aug 2025 15:40:28 +0300 Subject: [PATCH 05/39] add default backpressure via Async::Semaphore and handle client disconnects --- .../lib/react_on_rails_pro/concerns/stream.rb | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index 2b92f47650..14f5d3f86a 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -41,9 +41,11 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true if ReactOnRailsPro.configuration.concurrent_stream_drain require "async" require "async/queue" + require "async/semaphore" Sync do |parent| queue = Async::Queue.new + semaphore = Async::Semaphore.new(64) remaining = @rorp_rendering_fibers.size unless remaining.zero? @@ -52,10 +54,10 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true tasks << parent.async do begin while (chunk = fiber.resume) - queue.enqueue([idx, chunk]) + semaphore.acquire { queue.enqueue([idx, chunk]) } end rescue StandardError => e - queue.enqueue([idx, ""]) # minimal signal + semaphore.acquire { queue.enqueue([idx, ""]) } # minimal signal ensure queue.enqueue([idx, :__done__]) end @@ -71,7 +73,14 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true next end Rails.logger.info { "[ReactOnRailsPro] stream write (mode=concurrent) idx=#{_idx} bytes=#{item.bytesize}" } if ReactOnRailsPro.configuration.tracing - response.stream.write(item) + begin + response.stream.write(item) + rescue IOError, ActionController::Live::ClientDisconnected + # Client disconnected: stop early. + break + ensure + semaphore.release + end end end From 2941c29f977ed177a6a7d978b8ff8c588375100b Mon Sep 17 00:00:00 2001 From: ihabadham Date: Tue, 26 Aug 2025 15:41:16 +0300 Subject: [PATCH 06/39] add controller streaming specs for sequential vs concurrent, ordering, edge cases, and producer error --- .../spec/react_on_rails_pro/stream_spec.rb | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb index 86afdc65d2..d341d2800e 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb @@ -343,3 +343,133 @@ end end end + +describe "Controller streaming (concurrent vs sequential)" do + # Lightweight fakes mirroring controller concern behavior without HTTPX. + class TestStream + def initialize(chunks_with_delays:, raise_after: nil) + @chunks_with_delays = chunks_with_delays + @raise_after = raise_after + end + + def each_chunk + return enum_for(:each_chunk) unless block_given? + + count = 0 + @chunks_with_delays.each do |(delay, data)| + begin + require "async" + task = Async::Task.current + task ? task.sleep(delay) : sleep(delay) + rescue StandardError + sleep(delay) + end + yield data + count += 1 + raise "Fake error" if @raise_after && count >= @raise_after + end + end + end + + class TestResponseStream + attr_reader :writes + def initialize + @writes = [] + @closed = false + end + def write(data) + @writes << data + end + def close + @closed = true + end + def closed? + @closed + end + end + + class TestController + include ReactOnRailsPro::Stream + attr_reader :response + def initialize(streams) + @streams = streams + @response = OpenStruct.new(stream: TestResponseStream.new) + end + def render_to_string(template:, **_opts) + @rorp_rendering_fibers ||= [] + initial_chunks = [] + @streams.each do |s| + fiber = Fiber.new do + s.each_chunk do |chunk| + Fiber.yield chunk + end + end + initial_chunks << fiber.resume + @rorp_rendering_fibers << fiber + end + ["TEMPLATE\n", *initial_chunks].join + end + end + + let(:a_stream) { TestStream.new(chunks_with_delays: [[0.30, "A1\n"], [0.90, "A2\n"]]) } + let(:b_stream) { TestStream.new(chunks_with_delays: [[0.10, "B1\n"], [0.10, "B2\n"], [0.20, "B3\n"]]) } + + def run_and_collect(streams:, concurrent:) + original = ReactOnRailsPro.configuration.concurrent_stream_drain + ReactOnRailsPro.configuration.concurrent_stream_drain = concurrent + controller = TestController.new(streams) + controller.stream_view_containing_react_components(template: "ignored") + [controller.response.stream.writes, controller.response.stream.closed?] + ensure + ReactOnRailsPro.configuration.concurrent_stream_drain = original + end + + it "gates by config (sequential vs concurrent)" do + writes_seq, closed_seq = run_and_collect(streams: [a_stream, b_stream], concurrent: false) + writes_conc, closed_conc = run_and_collect(streams: [a_stream, b_stream], concurrent: true) + + expect(writes_seq.first).to start_with("TEMPLATE") + expect(writes_conc.first).to start_with("TEMPLATE") + + joined_seq = writes_seq.drop(1).join + joined_conc = writes_conc.drop(1).join + + expect(joined_seq).to match(/A2.*B2.*B3/m) + expect(joined_conc).to match(/B2.*A2/m) + + expect(closed_seq).to be true + expect(closed_conc).to be true + end + + it "preserves per-component order" do + multi_a = TestStream.new(chunks_with_delays: [[0.05, "X1\n"], [0.05, "X2\n"], [0.05, "X3\n"]]) + multi_b = TestStream.new(chunks_with_delays: [[0.01, "Y1\n"], [0.02, "Y2\n"]]) + writes, _ = run_and_collect(streams: [multi_a, multi_b], concurrent: true) + joined = writes.join + # X1 is inline in template; ensure X2 before X3 in remaining output. + expect(joined).to match(/X2.*X3/m) + end + + it "handles zero fibers" do + writes, closed = run_and_collect(streams: [], concurrent: true) + expect(writes).to eq(["TEMPLATE\n"]) + expect(closed).to be true + end + + it "handles one fiber same as before" do + single = TestStream.new(chunks_with_delays: [[0.05, "S1\n"], [0.05, "S2\n"]]) + writes_seq, _ = run_and_collect(streams: [single], concurrent: false) + writes_conc, _ = run_and_collect(streams: [single], concurrent: true) + expect(writes_seq.join).to include("S2\n") + expect(writes_conc.join).to include("S2\n") + end + + it "continues other producers when one errors" do + erring = TestStream.new(chunks_with_delays: [[0.01, "E1\n"]], raise_after: 1) + ok = TestStream.new(chunks_with_delays: [[0.01, "O1\n"], [0.02, "O2\n"]]) + writes, _ = run_and_collect(streams: [erring, ok], concurrent: true) + joined = writes.join + expect(joined).to include(""]) } # minimal signal - ensure - queue.enqueue([idx, :__done__]) - end - end - end - - writer = parent.async do - loop do - _idx, item = queue.dequeue - if item == :__done__ - remaining -= 1 - break if remaining.zero? - next - end - Rails.logger.info { "[ReactOnRailsPro] stream write (mode=concurrent) idx=#{_idx} bytes=#{item.bytesize}" } if ReactOnRailsPro.configuration.tracing - begin - response.stream.write(item) - rescue IOError, ActionController::Live::ClientDisconnected - # Client disconnected: stop early. - break - ensure - semaphore.release - end - end - end - - tasks.each(&:wait) - writer.wait + drain_streams_concurrently + else + drain_streams_sequentially + end + response.stream.close if close_stream_at_end + end + + private + + def drain_streams_concurrently + require "async" + require "async/queue" + require "async/semaphore" + + Sync do |parent| + queue = Async::Queue.new + semaphore = Async::Semaphore.new(64) + remaining = @rorp_rendering_fibers.size + + return if remaining.zero? + + tasks = build_producer_tasks(parent: parent, queue: queue, semaphore: semaphore) + writer = build_writer_task(parent: parent, queue: queue, semaphore: semaphore, remaining: remaining) + + tasks.each(&:wait) + writer.wait + end + end + + def build_producer_tasks(parent:, queue:, semaphore:) + @rorp_rendering_fibers.each_with_index.map do |fiber, idx| + parent.async do + while (chunk = fiber.resume) + semaphore.acquire { queue.enqueue([idx, chunk]) } end + rescue StandardError => e + error_msg = "" + semaphore.acquire { queue.enqueue([idx, error_msg]) } # minimal signal + ensure + queue.enqueue([idx, :__done__]) end - response.stream.close if close_stream_at_end - else - @rorp_rendering_fibers.each_with_index do |fiber, idx| - loop do - begin - chunk = fiber.resume - rescue FiberError - break - end - break unless chunk - Rails.logger.info { "[ReactOnRailsPro] stream write (mode=sequential) idx=#{idx} bytes=#{chunk.bytesize}" } if ReactOnRailsPro.configuration.tracing - response.stream.write(chunk) + end + end + + def build_writer_task(parent:, queue:, semaphore:, remaining:) + parent.async do + remaining_count = remaining + loop do + idx_from_queue, item = queue.dequeue + if item == :__done__ + remaining_count -= 1 + break if remaining_count.zero? + + next + end + log_stream_write(mode: :concurrent, idx: idx_from_queue, bytesize: item.bytesize) + begin + response.stream.write(item) + rescue IOError, ActionController::Live::ClientDisconnected + break + ensure + semaphore.release end end - response.stream.close if close_stream_at_end end end + + def drain_streams_sequentially + @rorp_rendering_fibers.each_with_index do |fiber, idx| + loop do + begin + chunk = fiber.resume + rescue FiberError + break + end + break unless chunk + + log_stream_write(mode: :sequential, idx: idx, bytesize: chunk.bytesize) + response.stream.write(chunk) + end + end + end + + def log_stream_write(mode:, idx:, bytesize:) + return unless ReactOnRailsPro.configuration.tracing + + message = "[ReactOnRailsPro] stream write (mode=#{mode}) idx=#{idx} bytes=#{bytesize}" + Rails.logger.info { message } + end end end diff --git a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb index 4ac54c1425..878abba2a8 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/configuration.rb @@ -81,7 +81,8 @@ def initialize(renderer_url: nil, renderer_password: nil, server_renderer: nil, renderer_request_retry_limit: nil, throw_js_errors: nil, ssr_timeout: nil, profile_server_rendering_js_code: nil, raise_non_shell_server_rendering_errors: nil, enable_rsc_support: nil, rsc_payload_generation_url_path: nil, - rsc_bundle_js_file: nil, react_client_manifest_file: nil, react_server_client_manifest_file: nil, + rsc_bundle_js_file: nil, react_client_manifest_file: nil, + react_server_client_manifest_file: nil, concurrent_stream_drain: DEFAULT_CONCURRENT_STREAM_DRAIN) self.renderer_url = renderer_url self.renderer_password = renderer_password diff --git a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb index 8370e35760..6160b77af8 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb @@ -2,6 +2,104 @@ require_relative "spec_helper" +# Test helper classes for streaming specs +class TestStream + def initialize(chunks_with_delays:, raise_after: nil) + @chunks_with_delays = chunks_with_delays + @raise_after = raise_after + end + + def each_chunk + return enum_for(:each_chunk) unless block_given? + + count = 0 + @chunks_with_delays.each do |(delay, data)| + begin + require "async" + task = Async::Task.current + task ? task.sleep(delay) : sleep(delay) + rescue StandardError + sleep(delay) + end + yield data + count += 1 + raise "Fake error" if @raise_after && count >= @raise_after + end + end +end + +class TestResponseStream + attr_reader :writes + + def initialize + @writes = [] + @closed = false + end + + def write(data) + @writes << data + end + + def close + @closed = true + end + + def closed? + @closed + end +end + +class SlowResponseStream < TestResponseStream + attr_reader :timestamps + + def initialize(delay: 0.05) + super() + @delay = delay + @timestamps = [] + end + + def write(data) + sleep(@delay) + @timestamps << Process.clock_gettime(Process::CLOCK_MONOTONIC) + super + end +end + +ResponseStruct = Struct.new(:stream) + +class TestController + include ReactOnRailsPro::Stream + + attr_reader :response + + def initialize(streams) + @streams = streams + @response = ResponseStruct.new(TestResponseStream.new) + end + + def render_to_string(**_opts) + @rorp_rendering_fibers ||= [] + initial_chunks = [] + @streams.each do |s| + fiber = Fiber.new do + s.each_chunk do |chunk| + Fiber.yield chunk + end + end + initial_chunks << fiber.resume + @rorp_rendering_fibers << fiber + end + ["TEMPLATE\n", *initial_chunks].join + end +end + +class SlowWriterController < TestController + def initialize(streams) + super(streams) + @response = ResponseStruct.new(SlowResponseStream.new(delay: 0.05)) + end +end + RSpec.describe "Streaming API" do let(:origin) { "http://api.example.com" } let(:path) { "/stream" } @@ -342,179 +440,92 @@ expect(mocked_block).to have_received(:call).with("First chunk") end end -end -describe "Controller streaming (concurrent vs sequential)" do - # Lightweight fakes mirroring controller concern behavior without HTTPX. - class TestStream - def initialize(chunks_with_delays:, raise_after: nil) - @chunks_with_delays = chunks_with_delays - @raise_after = raise_after + describe "Controller streaming (concurrent vs sequential)" do + let(:a_stream) { TestStream.new(chunks_with_delays: [[0.30, "A1\n"], [0.90, "A2\n"]]) } + let(:b_stream) { TestStream.new(chunks_with_delays: [[0.10, "B1\n"], [0.10, "B2\n"], [0.20, "B3\n"]]) } + + def run_and_collect(streams:, concurrent:) + original = ReactOnRailsPro.configuration.concurrent_stream_drain + ReactOnRailsPro.configuration.concurrent_stream_drain = concurrent + controller = TestController.new(streams) + controller.stream_view_containing_react_components(template: "ignored") + [controller.response.stream.writes, controller.response.stream.closed?] + ensure + ReactOnRailsPro.configuration.concurrent_stream_drain = original end - def each_chunk - return enum_for(:each_chunk) unless block_given? - - count = 0 - @chunks_with_delays.each do |(delay, data)| - begin - require "async" - task = Async::Task.current - task ? task.sleep(delay) : sleep(delay) - rescue StandardError - sleep(delay) - end - yield data - count += 1 - raise "Fake error" if @raise_after && count >= @raise_after - end - end - end + it "gates by config (sequential vs concurrent)" do + writes_seq, closed_seq = run_and_collect(streams: [a_stream, b_stream], concurrent: false) + writes_conc, closed_conc = run_and_collect(streams: [a_stream, b_stream], concurrent: true) - class TestResponseStream - attr_reader :writes - def initialize - @writes = [] - @closed = false - end - def write(data) - @writes << data - end - def close - @closed = true - end - def closed? - @closed - end - end + expect(writes_seq.first).to start_with("TEMPLATE") + expect(writes_conc.first).to start_with("TEMPLATE") - class SlowResponseStream < TestResponseStream - attr_reader :timestamps - def initialize(delay: 0.05) - super() - @delay = delay - @timestamps = [] - end - def write(data) - sleep(@delay) - @timestamps << Process.clock_gettime(Process::CLOCK_MONOTONIC) - super - end - end + joined_seq = writes_seq.drop(1).join + joined_conc = writes_conc.drop(1).join - class TestController - include ReactOnRailsPro::Stream - attr_reader :response - def initialize(streams) - @streams = streams - @response = OpenStruct.new(stream: TestResponseStream.new) - end - def render_to_string(template:, **_opts) - @rorp_rendering_fibers ||= [] - initial_chunks = [] - @streams.each do |s| - fiber = Fiber.new do - s.each_chunk do |chunk| - Fiber.yield chunk - end - end - initial_chunks << fiber.resume - @rorp_rendering_fibers << fiber - end - ["TEMPLATE\n", *initial_chunks].join - end - end + expect(joined_seq).to match(/A2.*B2.*B3/m) + expect(joined_conc).to match(/B2.*A2/m) - class SlowWriterController < TestController - def initialize(streams) - @streams = streams - @response = OpenStruct.new(stream: SlowResponseStream.new(delay: 0.05)) + expect(closed_seq).to be true + expect(closed_conc).to be true end - end - - let(:a_stream) { TestStream.new(chunks_with_delays: [[0.30, "A1\n"], [0.90, "A2\n"]]) } - let(:b_stream) { TestStream.new(chunks_with_delays: [[0.10, "B1\n"], [0.10, "B2\n"], [0.20, "B3\n"]]) } - - def run_and_collect(streams:, concurrent:) - original = ReactOnRailsPro.configuration.concurrent_stream_drain - ReactOnRailsPro.configuration.concurrent_stream_drain = concurrent - controller = TestController.new(streams) - controller.stream_view_containing_react_components(template: "ignored") - [controller.response.stream.writes, controller.response.stream.closed?] - ensure - ReactOnRailsPro.configuration.concurrent_stream_drain = original - end - - it "gates by config (sequential vs concurrent)" do - writes_seq, closed_seq = run_and_collect(streams: [a_stream, b_stream], concurrent: false) - writes_conc, closed_conc = run_and_collect(streams: [a_stream, b_stream], concurrent: true) - - expect(writes_seq.first).to start_with("TEMPLATE") - expect(writes_conc.first).to start_with("TEMPLATE") - joined_seq = writes_seq.drop(1).join - joined_conc = writes_conc.drop(1).join - - expect(joined_seq).to match(/A2.*B2.*B3/m) - expect(joined_conc).to match(/B2.*A2/m) - - expect(closed_seq).to be true - expect(closed_conc).to be true - end + it "preserves per-component order" do + multi_a = TestStream.new(chunks_with_delays: [[0.05, "X1\n"], [0.05, "X2\n"], [0.05, "X3\n"]]) + multi_b = TestStream.new(chunks_with_delays: [[0.01, "Y1\n"], [0.02, "Y2\n"]]) + writes, = run_and_collect(streams: [multi_a, multi_b], concurrent: true) + joined = writes.join + # X1 is inline in template; ensure X2 before X3 in remaining output. + expect(joined).to match(/X2.*X3/m) + end - it "preserves per-component order" do - multi_a = TestStream.new(chunks_with_delays: [[0.05, "X1\n"], [0.05, "X2\n"], [0.05, "X3\n"]]) - multi_b = TestStream.new(chunks_with_delays: [[0.01, "Y1\n"], [0.02, "Y2\n"]]) - writes, _ = run_and_collect(streams: [multi_a, multi_b], concurrent: true) - joined = writes.join - # X1 is inline in template; ensure X2 before X3 in remaining output. - expect(joined).to match(/X2.*X3/m) - end + it "handles zero fibers" do + writes, closed = run_and_collect(streams: [], concurrent: true) + expect(writes).to eq(["TEMPLATE\n"]) + expect(closed).to be true + end - it "handles zero fibers" do - writes, closed = run_and_collect(streams: [], concurrent: true) - expect(writes).to eq(["TEMPLATE\n"]) - expect(closed).to be true - end + it "handles one fiber same as before" do + single = TestStream.new(chunks_with_delays: [[0.05, "S1\n"], [0.05, "S2\n"]]) + writes_seq, = run_and_collect(streams: [single], concurrent: false) + writes_conc, = run_and_collect(streams: [single], concurrent: true) + expect(writes_seq.join).to include("S2\n") + expect(writes_conc.join).to include("S2\n") + end - it "handles one fiber same as before" do - single = TestStream.new(chunks_with_delays: [[0.05, "S1\n"], [0.05, "S2\n"]]) - writes_seq, _ = run_and_collect(streams: [single], concurrent: false) - writes_conc, _ = run_and_collect(streams: [single], concurrent: true) - expect(writes_seq.join).to include("S2\n") - expect(writes_conc.join).to include("S2\n") - end + it "continues other producers when one errors" do + erring = TestStream.new(chunks_with_delays: [[0.01, "E1\n"]], raise_after: 1) + ok = TestStream.new(chunks_with_delays: [[0.01, "O1\n"], [0.02, "O2\n"]]) + writes, = run_and_collect(streams: [erring, ok], concurrent: true) + joined = writes.join + expect(joined).to include("" semaphore.acquire { queue.enqueue([idx, error_msg]) } # minimal signal - ensure - queue.enqueue([idx, :__done__]) end end end - def build_writer_task(parent:, queue:, semaphore:, remaining:) + def build_writer_task(parent:, queue:, semaphore:) parent.async do - remaining_count = remaining loop do - idx_from_queue, item = queue.dequeue - if item == :__done__ - remaining_count -= 1 - break if remaining_count.zero? - - next - end + pair = queue.dequeue + break if pair.nil? + idx_from_queue, item = pair log_stream_write(mode: :concurrent, idx: idx_from_queue, bytesize: safe_bytesize(item)) begin response.stream.write(item) From effb6df885c33a7cf7b92c24c7fe5ea30426f898 Mon Sep 17 00:00:00 2001 From: ihabadham Date: Fri, 29 Aug 2025 20:41:25 +0300 Subject: [PATCH 13/39] refactor: propagate streaming errors instead of rescuing - propagate runtime errors instead of silencing them - keep gracefully handling already terminated fibers from resuming - properly handle cleaning/shutting down when errors occur --- .../lib/react_on_rails_pro/concerns/stream.rb | 29 ++++++++++++------- .../spec/react_on_rails_pro/stream_spec.rb | 16 ++++++---- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb index b8fd0a4668..9835114c6d 100644 --- a/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb +++ b/react_on_rails_pro/lib/react_on_rails_pro/concerns/stream.rb @@ -38,12 +38,15 @@ def stream_view_containing_react_components(template:, close_stream_at_end: true # So we strip extra newlines from the template string and add a single newline response.stream.write(template_string) - if ReactOnRailsPro.configuration.concurrent_stream_drain - drain_streams_concurrently - else - drain_streams_sequentially + begin + if ReactOnRailsPro.configuration.concurrent_stream_drain + drain_streams_concurrently + else + drain_streams_sequentially + end + ensure + response.stream.close if close_stream_at_end end - response.stream.close if close_stream_at_end end private @@ -65,20 +68,29 @@ def drain_streams_concurrently writer = build_writer_task(parent: parent, queue: queue, semaphore: semaphore) tasks = build_producer_tasks(parent: parent, queue: queue, semaphore: semaphore) + # This structure ensures that even if a producer task fails, we always + # signal the writer to stop and then wait for it to finish draining + # any remaining items from the queue before propagating the error. begin tasks.each(&:wait) ensure # `close` signals end-of-stream; when writer tries to dequeue, it will get nil, so it will exit. queue.close + writer.wait end - writer.wait end end def build_producer_tasks(parent:, queue:, semaphore:) @rorp_rendering_fibers.each_with_index.map do |fiber, idx| parent.async do - while (chunk = fiber.resume) + loop do + begin + chunk = fiber.resume + rescue FiberError + break + end + break unless chunk # We use `acquire` and not `async` to create backpressure. # A simple comparison: # - `acquire`: Blocks this fiber until a permit is free -> forces backpressure. @@ -86,9 +98,6 @@ def build_producer_tasks(parent:, queue:, semaphore:) # by buffering all chunks in memory. semaphore.acquire { queue.enqueue([idx, chunk]) } end - rescue StandardError => e - error_msg = "" - semaphore.acquire { queue.enqueue([idx, error_msg]) } # minimal signal end end end diff --git a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb index 6160b77af8..873c94c1dc 100644 --- a/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb +++ b/react_on_rails_pro/spec/react_on_rails_pro/stream_spec.rb @@ -495,13 +495,19 @@ def run_and_collect(streams:, concurrent:) expect(writes_conc.join).to include("S2\n") end - it "continues other producers when one errors" do + it "fails the request when a producer errors", :aggregate_failures do erring = TestStream.new(chunks_with_delays: [[0.01, "E1\n"]], raise_after: 1) ok = TestStream.new(chunks_with_delays: [[0.01, "O1\n"], [0.02, "O2\n"]]) - writes, = run_and_collect(streams: [erring, ok], concurrent: true) - joined = writes.join - expect(joined).to include("