From dbdd226c74081f8f1e478a99f82af5c39cc4fa4a Mon Sep 17 00:00:00 2001 From: gaojun Date: Thu, 3 Apr 2025 16:05:18 +0800 Subject: [PATCH 1/5] Add ability to POST an SSE request --- lib/ld-eventsource/client.rb | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index 6355a62..7969624 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -85,6 +85,10 @@ class Client # if you want to use something other than the default `TCPSocket`; it must implement # `open(uri, timeout)` to return a connected `Socket` # @yieldparam [Client] client the new client instance, before opening the connection + # @payload payload [String | Hash | Array] (nil) optional request payload. If payload is + # provided, a POST request will be used, instead of a GET request. If payload is a Hash or + # an Array, it will be converted to JSON and sent as the request body. Also, reconnection + # is disabled if payload is set. # def initialize(uri, headers: {}, @@ -95,13 +99,15 @@ def initialize(uri, last_event_id: nil, proxy: nil, logger: nil, - socket_factory: nil) + socket_factory: nil, + payload: nil) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @headers = headers.clone @connect_timeout = connect_timeout @read_timeout = read_timeout + @payload = payload @logger = logger || default_logger http_client_options = {} if socket_factory @@ -243,6 +249,8 @@ def run_stream end begin reset_http + # When we post request with payload, reconnection should be avoided. + close if @payload rescue StandardError => e log_and_dispatch_error(e, "Unexpected error while closing stream") end @@ -262,9 +270,8 @@ def connect cxn = nil begin @logger.info { "Connecting to event stream at #{@uri}" } - cxn = @http_client.request("GET", @uri, { - headers: build_headers - }) + verb = @payload ? "POST" : "GET" + cxn = @http_client.request(verb, @uri, build_opts) if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") @@ -358,5 +365,15 @@ def build_headers h['Last-Event-Id'] = @last_id if !@last_id.nil? && @last_id != "" h.merge(@headers) end + + def build_opts + return {headers: build_headers} if @payload.nil? + + if @payload.is_a?(Hash) || @payload.is_a?(Array) + {headers: build_headers, json: @payload} + else + {headers: build_headers, body: @payload.to_s} + end + end end end From 93f776e743959efaea5425644ef6eeb0280992dc Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 15 Aug 2025 14:34:21 -0400 Subject: [PATCH 2/5] Provide method as parameter option --- lib/ld-eventsource/client.rb | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index 7969624..19e0710 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -84,11 +84,11 @@ class Client # @param socket_factory [#open] (nil) an optional factory object for creating sockets, # if you want to use something other than the default `TCPSocket`; it must implement # `open(uri, timeout)` to return a connected `Socket` - # @yieldparam [Client] client the new client instance, before opening the connection - # @payload payload [String | Hash | Array] (nil) optional request payload. If payload is - # provided, a POST request will be used, instead of a GET request. If payload is a Hash or + # @param method [String] ("GET") the HTTP method to use for requests + # @param payload [String, Hash, Array] (nil) optional request payload. If payload is a Hash or # an Array, it will be converted to JSON and sent as the request body. Also, reconnection # is disabled if payload is set. + # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, headers: {}, @@ -100,6 +100,7 @@ def initialize(uri, proxy: nil, logger: nil, socket_factory: nil, + method: "GET", payload: nil) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @@ -107,6 +108,7 @@ def initialize(uri, @headers = headers.clone @connect_timeout = connect_timeout @read_timeout = read_timeout + @method = method.to_s.upcase @payload = payload @logger = logger || default_logger http_client_options = {} @@ -270,8 +272,7 @@ def connect cxn = nil begin @logger.info { "Connecting to event stream at #{@uri}" } - verb = @payload ? "POST" : "GET" - cxn = @http_client.request(verb, @uri, build_opts) + cxn = @http_client.request(@method, @uri, build_opts) if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") From 4b9c0417324642d724937f12e70335dd9a7cb288 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 15 Aug 2025 15:41:04 -0400 Subject: [PATCH 3/5] Allow payload to be callable --- lib/ld-eventsource/client.rb | 17 +- spec/client_spec.rb | 296 ++++++++++++++++++++++++++++++++++- 2 files changed, 302 insertions(+), 11 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index 19e0710..0273b39 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -85,9 +85,9 @@ class Client # if you want to use something other than the default `TCPSocket`; it must implement # `open(uri, timeout)` to return a connected `Socket` # @param method [String] ("GET") the HTTP method to use for requests - # @param payload [String, Hash, Array] (nil) optional request payload. If payload is a Hash or - # an Array, it will be converted to JSON and sent as the request body. Also, reconnection - # is disabled if payload is set. + # @param payload [String, Hash, Array, #call] (nil) optional request payload. If payload is a Hash or + # an Array, it will be converted to JSON and sent as the request body. If payload responds to #call, + # it will be invoked on each request to generate the payload dynamically. # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, @@ -251,8 +251,6 @@ def run_stream end begin reset_http - # When we post request with payload, reconnection should be avoided. - close if @payload rescue StandardError => e log_and_dispatch_error(e, "Unexpected error while closing stream") end @@ -370,10 +368,13 @@ def build_headers def build_opts return {headers: build_headers} if @payload.nil? - if @payload.is_a?(Hash) || @payload.is_a?(Array) - {headers: build_headers, json: @payload} + # Resolve payload if it's callable + resolved_payload = @payload.respond_to?(:call) ? @payload.call : @payload + + if resolved_payload.is_a?(Hash) || resolved_payload.is_a?(Array) + {headers: build_headers, json: resolved_payload} else - {headers: build_headers, body: @payload.to_s} + {headers: build_headers, body: resolved_payload.to_s} end end end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 51f516d..4aa88c1 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -57,7 +57,7 @@ def send_stream_content(res, content, keep_open:) requests << req send_stream_content(res, "", keep_open: true) end - + headers = { "Authorization" => "secret" } with_client(subject.new(server.base_uri, headers: headers)) do |client| @@ -82,7 +82,7 @@ def send_stream_content(res, content, keep_open:) requests << req send_stream_content(res, "", keep_open: true) end - + headers = { "Authorization" => "secret" } with_client(subject.new(server.base_uri, headers: headers, last_event_id: id)) do |client| @@ -438,7 +438,7 @@ def send_stream_content(res, content, keep_open:) server.setup_response("/") do |req,res| send_stream_content(res, "", keep_open: true) end - + with_client(subject.new(server.base_uri)) do |client| expect(client.closed?).to be(false) @@ -447,4 +447,294 @@ def send_stream_content(res, content, keep_open:) end end end + + describe "HTTP method parameter" do + it "defaults to GET method" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("GET") + end + end + end + + it "uses explicit GET method" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri, method: "GET")) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("GET") + end + end + end + + it "uses explicit POST method" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri, method: "POST")) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + end + end + end + + it "normalizes method to uppercase" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri, method: "post")) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + end + end + end + end + + describe "payload parameter" do + it "sends string payload as body" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + payload = "test-string-payload" + with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + expect(received_req.body).to eq(payload) + end + end + end + + it "sends hash payload as JSON" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + payload = {user: "test", id: 123} + with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + expect(received_req.header["content-type"].first).to include("application/json") + parsed_body = JSON.parse(received_req.body) + expect(parsed_body).to eq({"user" => "test", "id" => 123}) + end + end + end + + it "sends array payload as JSON" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + payload = ["item1", "item2", "item3"] + with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + expect(received_req.header["content-type"].first).to include("application/json") + parsed_body = JSON.parse(received_req.body) + expect(parsed_body).to eq(["item1", "item2", "item3"]) + end + end + end + + it "works with GET method and payload" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + payload = "get-with-payload" + with_client(subject.new(server.base_uri, method: "GET", payload: payload)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("GET") + expect(received_req.body).to eq(payload) + end + end + end + end + + describe "callable payload parameter" do + it "invokes lambda payload on each request" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: false) # Close to trigger reconnect + end + + counter = 0 + callable_payload = -> { counter += 1; "request-#{counter}" } + + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload, reconnect_time: reconnect_asap)) do |client| + # Wait for first request + req1 = requests.pop + expect(req1.body).to eq("request-1") + + # Wait for reconnect and second request + req2 = requests.pop + expect(req2.body).to eq("request-2") + end + end + end + + it "invokes proc payload on each request" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: false) + end + + counter = 0 + callable_payload = proc { counter += 1; {request_id: counter, timestamp: Time.now.to_i} } + + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload, reconnect_time: reconnect_asap)) do |client| + # Wait for first request + req1 = requests.pop + parsed_body1 = JSON.parse(req1.body) + expect(parsed_body1["request_id"]).to eq(1) + + # Wait for reconnect and second request + req2 = requests.pop + parsed_body2 = JSON.parse(req2.body) + expect(parsed_body2["request_id"]).to eq(2) + expect(parsed_body2["timestamp"]).to be >= parsed_body1["timestamp"] + end + end + end + + it "invokes custom callable object payload" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + class TestPayloadGenerator + def initialize + @counter = 0 + end + + def call + @counter += 1 + {generator: "test", count: @counter} + end + end + + callable_payload = TestPayloadGenerator.new + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + parsed_body = JSON.parse(received_req.body) + expect(parsed_body).to eq({"generator" => "test", "count" => 1}) + end + end + end + + it "handles callable returning string" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + callable_payload = -> { "dynamic-string-#{rand(1000)}" } + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + expect(received_req.body).to match(/^dynamic-string-\d+$/) + end + end + end + + it "handles callable returning hash" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + callable_payload = -> { {type: "dynamic", value: rand(1000)} } + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + expect(received_req.header["content-type"].first).to include("application/json") + parsed_body = JSON.parse(received_req.body) + expect(parsed_body["type"]).to eq("dynamic") + expect(parsed_body["value"]).to be_a(Integer) + end + end + end + + it "handles callable returning array" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + callable_payload = -> { ["dynamic", Time.now.to_i] } + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + expect(received_req.header["content-type"].first).to include("application/json") + parsed_body = JSON.parse(received_req.body) + expect(parsed_body[0]).to eq("dynamic") + expect(parsed_body[1]).to be_a(Integer) + end + end + end + + it "handles callable returning other types by converting to string" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + test_object = Object.new + def test_object.to_s + "custom-object-string" + end + + callable_payload = -> { test_object } + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + expect(received_req.body).to eq("custom-object-string") + end + end + end + end end From b3aa95798659760663d730d92d8024d65d73d223 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 15 Aug 2025 16:04:05 -0400 Subject: [PATCH 4/5] expand doc --- lib/ld-eventsource/client.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index 4e714fe..8c80aee 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -86,8 +86,9 @@ class Client # `open(uri, timeout)` to return a connected `Socket` # @param method [String] ("GET") the HTTP method to use for requests # @param payload [String, Hash, Array, #call] (nil) optional request payload. If payload is a Hash or - # an Array, it will be converted to JSON and sent as the request body. If payload responds to #call, - # it will be invoked on each request to generate the payload dynamically. + # an Array, it will be converted to JSON and sent as the request body. A string will be sent as a non-JSON + # request body. If payload responds to #call, it will be invoked on each + # request to generate the payload dynamically. # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, From 05e2e1533ecb7b079dfbbbe32a4d7324fe307da4 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 18 Aug 2025 11:14:05 -0400 Subject: [PATCH 5/5] sorting --- contract-tests/service.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/contract-tests/service.rb b/contract-tests/service.rb index 0a1d303..30220bf 100644 --- a/contract-tests/service.rb +++ b/contract-tests/service.rb @@ -22,7 +22,9 @@ capabilities: [ 'headers', 'last-event-id', + 'post', 'read-timeout', + 'report', ], }.to_json end @@ -54,6 +56,8 @@ entity = nil sse = SSE::Client.new( streamUrl, + method: opts[:method] || "GET", + payload: opts[:body] || nil, headers: opts[:headers] || {}, last_event_id: opts[:lastEventId], read_timeout: opts[:readTimeoutMs].nil? ? nil : (opts[:readTimeoutMs].to_f / 1000),