From b963e875c9591ec1cbf7f35c6f18a2f639a2e79b Mon Sep 17 00:00:00 2001 From: Ismael Celis Date: Mon, 4 Aug 2025 12:59:33 +0100 Subject: [PATCH] Do not block main server thread when launching streaming threads Instead, handle streaming queue in a new thread (which serialises writes to the connection socket, and handlers errors such as IOError, Errno::EPIPE triggering callbacks. This is so that the server's request thread (ie Puma) can be quickly returned to the pool. Servers like Falcon (fibers instead of threads) should not have this problem, but they should still work fine with this (they will spawn an extra fiber, but that should be cheap). Possible issues: This change decouples the server's thread pool from Datastar's streaming threads, which ATM are unbounded (an app with long-lived streams could potentially spawn thousands of threads even if the server is configured with a limited pool. This can be problematic, because: * The server can run out of resourced * If the streams rely on services such as database connections, they could quickly drain those connection pools. Possible solution: provide configuration for a separate, Datastar-specific thread-pool so that it can be tweaked as per available resources (such as database pools) --- lib/datastar/dispatcher.rb | 48 ++++++++++++++++----- lib/datastar/server_sent_event_generator.rb | 3 ++ spec/dispatcher_spec.rb | 44 ++++++++++++++----- spec/support/dispatcher_examples.rb | 2 + 4 files changed, 76 insertions(+), 21 deletions(-) diff --git a/lib/datastar/dispatcher.rb b/lib/datastar/dispatcher.rb index 63031a3..98e4600 100644 --- a/lib/datastar/dispatcher.rb +++ b/lib/datastar/dispatcher.rb @@ -285,7 +285,7 @@ def stream_one(streamer) proc do |socket| generator = ServerSentEventGenerator.new(socket, signals:, view_context: @view_context) @on_connect.each { |callable| callable.call(generator) } - handling_errors(generator, socket) do + handling_sync_errors(generator, socket) do streamer.call(generator) end ensure @@ -313,9 +313,10 @@ def stream_many(streamer) @on_connect.each { |callable| callable.call(conn_generator) } threads = @streamers.map do |streamer| + duped_signals = signs.dup.freeze @executor.spawn do # TODO: Review thread-safe view context - generator = ServerSentEventGenerator.new(@queue, signals: signs, view_context: @view_context) + generator = ServerSentEventGenerator.new(@queue, signals: duped_signals, view_context: @view_context) streamer.call(generator) @queue << :done rescue StandardError => e @@ -323,7 +324,12 @@ def stream_many(streamer) end end - handling_errors(conn_generator, socket) do + # Now launch the control thread that actually writes to the socket + # We don't want to block the main thread, so that servers like Puma + # which have a limited thread pool can keep serving other requests + # Other streamers will push any StandardError exceptions to the queue + # So we handle them here + @executor.spawn do done_count = 0 threads_size = @heartbeat_on ? threads.size - 1 : threads.size @@ -332,24 +338,46 @@ def stream_many(streamer) done_count += 1 @queue << nil if done_count == threads_size elsif data.is_a?(Exception) - raise data + handle_streaming_error(data, socket) + @queue << nil else - socket << data + # Here we attempt writing to the actual socket + # which may raise an IOError if the client disconnected + begin + socket << data + rescue Exception => e + handle_streaming_error(e, socket) + @queue << nil + end end end + + ensure + @on_server_disconnect.each { |callable| callable.call(conn_generator) } + @executor.stop(threads) if threads + socket.close end - ensure - @executor.stop(threads) if threads - socket.close end end - # Run a streaming block while handling errors + # Handle errors caught during streaming + # @param error [Exception] the error that occurred + # @param socket [IO] the socket to pass to error handlers + def handle_streaming_error(error, socket) + case error + when IOError, Errno::EPIPE, Errno::ECONNRESET + @on_client_disconnect.each { |callable| callable.call(socket) } + when Exception + @on_error.each { |callable| callable.call(error) } + end + end + + # Run a block while handling errors # @param generator [ServerSentEventGenerator] # @param socket [IO] # @yield # @api private - def handling_errors(generator, socket, &) + def handling_sync_errors(generator, socket, &) yield @on_server_disconnect.each { |callable| callable.call(generator) } diff --git a/lib/datastar/server_sent_event_generator.rb b/lib/datastar/server_sent_event_generator.rb index e116e1a..f8c6de7 100644 --- a/lib/datastar/server_sent_event_generator.rb +++ b/lib/datastar/server_sent_event_generator.rb @@ -24,6 +24,9 @@ class ServerSentEventGenerator attr_reader :signals + # @param stream [IO, Queue] The IO stream or Queue to write to + # @option signals [Hash] A hash of signals (params) + # @option view_context [Object] The view context for rendering elements, if applicable. def initialize(stream, signals:, view_context: nil) @stream = stream @signals = signals diff --git a/spec/dispatcher_spec.rb b/spec/dispatcher_spec.rb index ba4e2d6..8c3b36b 100644 --- a/spec/dispatcher_spec.rb +++ b/spec/dispatcher_spec.rb @@ -2,20 +2,35 @@ class TestSocket attr_reader :lines, :open - def initialize + + def initialize(open: true) @lines = [] - @open = true + @open = open + @finish = Thread::Queue.new end def <<(line) + raise Errno::EPIPE, 'Socket closed' unless @open + @lines << line end - def close = @open = false + def close + @open = false + @finish << true + end def split_lines @lines.join.split("\n") end + + # Streams run in threads + # we can call this to signal the end of the stream + # in tests + def wait_for_close(&) + @finish.pop + yield if block_given? + end end RSpec.describe Datastar::Dispatcher do @@ -407,6 +422,8 @@ def self.render_in(view_context) = %(
\n#{view_context}\ndata: elements hello\ndata: elements
\n\n") @@ -448,8 +465,8 @@ def self.render_in(view_context) = %(
\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}\n#{view_context}