From 7d68a4c1da233ec5d875273fda48f9dc42b5f7bf Mon Sep 17 00:00:00 2001 From: saada Date: Thu, 14 Aug 2025 13:04:43 -0400 Subject: [PATCH 1/2] Add POST/PUT request support and optional parsing features - Add support for POST/PUT HTTP methods with JSON payload - Fix proxy authentication parameter names (proxy_username/proxy_password) - Add optional parsing mode to bypass SSE parsing for raw data streaming These changes enable more flexible usage patterns while maintaining backward compatibility with existing GET-only SSE implementations. --- lib/ld-eventsource/client.rb | 29 +++++++++++++++---- lib/ld-eventsource/impl/basic_event_parser.rb | 22 ++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) create mode 100644 lib/ld-eventsource/impl/basic_event_parser.rb diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index c018151..aee69fe 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -1,4 +1,5 @@ require "ld-eventsource/impl/backoff" +require "ld-eventsource/impl/basic_event_parser" require "ld-eventsource/impl/buffered_line_reader" require "ld-eventsource/impl/event_parser" require "ld-eventsource/events" @@ -49,6 +50,9 @@ class Client # The default value for `reconnect_reset_interval` in {#initialize}. DEFAULT_RECONNECT_RESET_INTERVAL = 60 + # The default HTTP method for requests. + DEFAULT_HTTP_METHOD = "GET" + # # Creates a new SSE client. # @@ -84,6 +88,9 @@ class Client # @param socket_factory [#open] (nil) an optional factory object for creating sockets, # if you want to use something other than the default `TCPSocket`; it must implement # `open(uri, timeout)` to return a connected `Socket` + # @param http_method [String] (DEFAULT_HTTP_METHOD) the HTTP method to use for requests + # @param http_payload [Hash] ({}) JSON payload to send with requests (only used with POST/PUT methods) + # @param parse [Boolean] (true) whether to parse SSE events or pass through raw chunks # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, @@ -92,17 +99,23 @@ def initialize(uri, read_timeout: DEFAULT_READ_TIMEOUT, reconnect_time: DEFAULT_RECONNECT_TIME, reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL, + http_method: DEFAULT_HTTP_METHOD, + http_payload: {}, last_event_id: nil, proxy: nil, logger: nil, - socket_factory: nil) + socket_factory: nil, + parse: true) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @headers = headers.clone @connect_timeout = connect_timeout @read_timeout = read_timeout + @http_method = http_method + @http_payload = http_payload @logger = logger || default_logger + @parse = parse http_client_options = {} if socket_factory http_client_options["socket_class"] = socket_factory @@ -121,6 +134,8 @@ def initialize(uri, http_client_options["proxy"] = { :proxy_address => @proxy.host, :proxy_port => @proxy.port, + :proxy_username => @proxy.user, + :proxy_password => @proxy.password, } end @@ -262,9 +277,9 @@ def connect cxn = nil begin @logger.info { "Connecting to event stream at #{@uri}" } - cxn = @http_client.request("GET", @uri, { - headers: build_headers, - }) + opts = { headers: build_headers } + opts[:json] = @http_payload unless @http_payload.empty? + cxn = @http_client.request(@http_method, @uri, opts) if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") @@ -316,7 +331,11 @@ def read_stream(cxn) end end end - event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id) + if @parse + event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id) + else + event_parser = Impl::BasicEventParser.new(chunks) + end event_parser.items.each do |item| return if @stopped.value diff --git a/lib/ld-eventsource/impl/basic_event_parser.rb b/lib/ld-eventsource/impl/basic_event_parser.rb new file mode 100644 index 0000000..3eb0533 --- /dev/null +++ b/lib/ld-eventsource/impl/basic_event_parser.rb @@ -0,0 +1,22 @@ +require "ld-eventsource/events" + +module SSE + module Impl + class BasicEventParser + + def initialize(chunks) + @chunks = chunks + end + + # Generator that parses the input iterator and returns instances of {StreamEvent} or {SetRetryInterval}. + def items + Enumerator.new do |gen| + @chunks.each do |chunk| + item = StreamEvent.new(chunk.nil? ? :final_message : :message, chunk, nil, nil) + gen.yield item + end + end + end + end + end +end \ No newline at end of file From aaa1442d93b49badadf99d6990927ecc3b542a65 Mon Sep 17 00:00:00 2001 From: saada Date: Thu, 14 Aug 2025 13:30:27 -0400 Subject: [PATCH 2/2] Add optional SSL verification override Add verify_ssl parameter (defaults to true) to allow disabling SSL certificate verification for development, testing, and internal networks. This maintains security by default while providing flexibility when needed. --- lib/ld-eventsource/client.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index aee69fe..c4dd44a 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -7,6 +7,7 @@ require "concurrent/atomics" require "logger" +require "openssl" require "thread" require "uri" require "http" @@ -91,6 +92,7 @@ class Client # @param http_method [String] (DEFAULT_HTTP_METHOD) the HTTP method to use for requests # @param http_payload [Hash] ({}) JSON payload to send with requests (only used with POST/PUT methods) # @param parse [Boolean] (true) whether to parse SSE events or pass through raw chunks + # @param verify_ssl [Boolean] (true) whether to verify SSL certificates; set to false for development/testing # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, @@ -105,7 +107,8 @@ def initialize(uri, proxy: nil, logger: nil, socket_factory: nil, - parse: true) + parse: true, + verify_ssl: true) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @@ -117,6 +120,9 @@ def initialize(uri, @logger = logger || default_logger @parse = parse http_client_options = {} + unless verify_ssl + http_client_options[:ssl] = { verify_mode: OpenSSL::SSL::VERIFY_NONE } + end if socket_factory http_client_options["socket_class"] = socket_factory end