Skip to content

Commit 7c65c4b

Browse files
Gao-Junkeelerm84
andauthored
feat: Add ability to control SSE request method and body (#51)
Co-authored-by: Matthew Keeler <mkeeler@launchdarkly.com>
1 parent 45e2033 commit 7c65c4b

File tree

3 files changed

+318
-4
lines changed

3 files changed

+318
-4
lines changed

contract-tests/service.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
capabilities: [
2323
'headers',
2424
'last-event-id',
25+
'post',
2526
'read-timeout',
27+
'report',
2628
],
2729
}.to_json
2830
end
@@ -54,6 +56,8 @@
5456
entity = nil
5557
sse = SSE::Client.new(
5658
streamUrl,
59+
method: opts[:method] || "GET",
60+
payload: opts[:body] || nil,
5761
headers: opts[:headers] || {},
5862
last_event_id: opts[:lastEventId],
5963
read_timeout: opts[:readTimeoutMs].nil? ? nil : (opts[:readTimeoutMs].to_f / 1000),

lib/ld-eventsource/client.rb

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ class Client
8484
# @param socket_factory [#open] (nil) an optional factory object for creating sockets,
8585
# if you want to use something other than the default `TCPSocket`; it must implement
8686
# `open(uri, timeout)` to return a connected `Socket`
87+
# @param method [String] ("GET") the HTTP method to use for requests
88+
# @param payload [String, Hash, Array, #call] (nil) optional request payload. If payload is a Hash or
89+
# an Array, it will be converted to JSON and sent as the request body. A string will be sent as a non-JSON
90+
# request body. If payload responds to #call, it will be invoked on each
91+
# request to generate the payload dynamically.
8792
# @yieldparam [Client] client the new client instance, before opening the connection
8893
#
8994
def initialize(uri,
@@ -95,13 +100,17 @@ def initialize(uri,
95100
last_event_id: nil,
96101
proxy: nil,
97102
logger: nil,
98-
socket_factory: nil)
103+
socket_factory: nil,
104+
method: "GET",
105+
payload: nil)
99106
@uri = URI(uri)
100107
@stopped = Concurrent::AtomicBoolean.new(false)
101108

102109
@headers = headers.clone
103110
@connect_timeout = connect_timeout
104111
@read_timeout = read_timeout
112+
@method = method.to_s.upcase
113+
@payload = payload
105114
@logger = logger || default_logger
106115
http_client_options = {}
107116
if socket_factory
@@ -262,9 +271,7 @@ def connect
262271
cxn = nil
263272
begin
264273
@logger.info { "Connecting to event stream at #{@uri}" }
265-
cxn = @http_client.request("GET", @uri, {
266-
headers: build_headers,
267-
})
274+
cxn = @http_client.request(@method, @uri, build_opts)
268275
if cxn.status.code == 200
269276
content_type = cxn.content_type.mime_type
270277
if content_type && content_type.start_with?("text/event-stream")
@@ -358,5 +365,18 @@ def build_headers
358365
h['Last-Event-Id'] = @last_id if !@last_id.nil? && @last_id != ""
359366
h.merge(@headers)
360367
end
368+
369+
def build_opts
370+
return {headers: build_headers} if @payload.nil?
371+
372+
# Resolve payload if it's callable
373+
resolved_payload = @payload.respond_to?(:call) ? @payload.call : @payload
374+
375+
if resolved_payload.is_a?(Hash) || resolved_payload.is_a?(Array)
376+
{headers: build_headers, json: resolved_payload}
377+
else
378+
{headers: build_headers, body: resolved_payload.to_s}
379+
end
380+
end
361381
end
362382
end

spec/client_spec.rb

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,4 +447,294 @@ def send_stream_content(res, content, keep_open:)
447447
end
448448
end
449449
end
450+
451+
describe "HTTP method parameter" do
452+
it "defaults to GET method" do
453+
with_server do |server|
454+
requests = Queue.new
455+
server.setup_response("/") do |req,res|
456+
requests << req
457+
send_stream_content(res, "", keep_open: true)
458+
end
459+
460+
with_client(subject.new(server.base_uri)) do |client|
461+
received_req = requests.pop
462+
expect(received_req.request_method).to eq("GET")
463+
end
464+
end
465+
end
466+
467+
it "uses explicit GET method" do
468+
with_server do |server|
469+
requests = Queue.new
470+
server.setup_response("/") do |req,res|
471+
requests << req
472+
send_stream_content(res, "", keep_open: true)
473+
end
474+
475+
with_client(subject.new(server.base_uri, method: "GET")) do |client|
476+
received_req = requests.pop
477+
expect(received_req.request_method).to eq("GET")
478+
end
479+
end
480+
end
481+
482+
it "uses explicit POST method" do
483+
with_server do |server|
484+
requests = Queue.new
485+
server.setup_response("/") do |req,res|
486+
requests << req
487+
send_stream_content(res, "", keep_open: true)
488+
end
489+
490+
with_client(subject.new(server.base_uri, method: "POST")) do |client|
491+
received_req = requests.pop
492+
expect(received_req.request_method).to eq("POST")
493+
end
494+
end
495+
end
496+
497+
it "normalizes method to uppercase" do
498+
with_server do |server|
499+
requests = Queue.new
500+
server.setup_response("/") do |req,res|
501+
requests << req
502+
send_stream_content(res, "", keep_open: true)
503+
end
504+
505+
with_client(subject.new(server.base_uri, method: "post")) do |client|
506+
received_req = requests.pop
507+
expect(received_req.request_method).to eq("POST")
508+
end
509+
end
510+
end
511+
end
512+
513+
describe "payload parameter" do
514+
it "sends string payload as body" do
515+
with_server do |server|
516+
requests = Queue.new
517+
server.setup_response("/") do |req,res|
518+
requests << req
519+
send_stream_content(res, "", keep_open: true)
520+
end
521+
522+
payload = "test-string-payload"
523+
with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client|
524+
received_req = requests.pop
525+
expect(received_req.request_method).to eq("POST")
526+
expect(received_req.body).to eq(payload)
527+
end
528+
end
529+
end
530+
531+
it "sends hash payload as JSON" do
532+
with_server do |server|
533+
requests = Queue.new
534+
server.setup_response("/") do |req,res|
535+
requests << req
536+
send_stream_content(res, "", keep_open: true)
537+
end
538+
539+
payload = {user: "test", id: 123}
540+
with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client|
541+
received_req = requests.pop
542+
expect(received_req.request_method).to eq("POST")
543+
expect(received_req.header["content-type"].first).to include("application/json")
544+
parsed_body = JSON.parse(received_req.body)
545+
expect(parsed_body).to eq({"user" => "test", "id" => 123})
546+
end
547+
end
548+
end
549+
550+
it "sends array payload as JSON" do
551+
with_server do |server|
552+
requests = Queue.new
553+
server.setup_response("/") do |req,res|
554+
requests << req
555+
send_stream_content(res, "", keep_open: true)
556+
end
557+
558+
payload = ["item1", "item2", "item3"]
559+
with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client|
560+
received_req = requests.pop
561+
expect(received_req.request_method).to eq("POST")
562+
expect(received_req.header["content-type"].first).to include("application/json")
563+
parsed_body = JSON.parse(received_req.body)
564+
expect(parsed_body).to eq(["item1", "item2", "item3"])
565+
end
566+
end
567+
end
568+
569+
it "works with GET method and payload" do
570+
with_server do |server|
571+
requests = Queue.new
572+
server.setup_response("/") do |req,res|
573+
requests << req
574+
send_stream_content(res, "", keep_open: true)
575+
end
576+
577+
payload = "get-with-payload"
578+
with_client(subject.new(server.base_uri, method: "GET", payload: payload)) do |client|
579+
received_req = requests.pop
580+
expect(received_req.request_method).to eq("GET")
581+
expect(received_req.body).to eq(payload)
582+
end
583+
end
584+
end
585+
end
586+
587+
describe "callable payload parameter" do
588+
it "invokes lambda payload on each request" do
589+
with_server do |server|
590+
requests = Queue.new
591+
server.setup_response("/") do |req,res|
592+
requests << req
593+
send_stream_content(res, "", keep_open: false) # Close to trigger reconnect
594+
end
595+
596+
counter = 0
597+
callable_payload = -> { counter += 1; "request-#{counter}" }
598+
599+
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload, reconnect_time: reconnect_asap)) do |client|
600+
# Wait for first request
601+
req1 = requests.pop
602+
expect(req1.body).to eq("request-1")
603+
604+
# Wait for reconnect and second request
605+
req2 = requests.pop
606+
expect(req2.body).to eq("request-2")
607+
end
608+
end
609+
end
610+
611+
it "invokes proc payload on each request" do
612+
with_server do |server|
613+
requests = Queue.new
614+
server.setup_response("/") do |req,res|
615+
requests << req
616+
send_stream_content(res, "", keep_open: false)
617+
end
618+
619+
counter = 0
620+
callable_payload = proc { counter += 1; {request_id: counter, timestamp: Time.now.to_i} }
621+
622+
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload, reconnect_time: reconnect_asap)) do |client|
623+
# Wait for first request
624+
req1 = requests.pop
625+
parsed_body1 = JSON.parse(req1.body)
626+
expect(parsed_body1["request_id"]).to eq(1)
627+
628+
# Wait for reconnect and second request
629+
req2 = requests.pop
630+
parsed_body2 = JSON.parse(req2.body)
631+
expect(parsed_body2["request_id"]).to eq(2)
632+
expect(parsed_body2["timestamp"]).to be >= parsed_body1["timestamp"]
633+
end
634+
end
635+
end
636+
637+
it "invokes custom callable object payload" do
638+
with_server do |server|
639+
requests = Queue.new
640+
server.setup_response("/") do |req,res|
641+
requests << req
642+
send_stream_content(res, "", keep_open: true)
643+
end
644+
645+
class TestPayloadGenerator
646+
def initialize
647+
@counter = 0
648+
end
649+
650+
def call
651+
@counter += 1
652+
{generator: "test", count: @counter}
653+
end
654+
end
655+
656+
callable_payload = TestPayloadGenerator.new
657+
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
658+
received_req = requests.pop
659+
parsed_body = JSON.parse(received_req.body)
660+
expect(parsed_body).to eq({"generator" => "test", "count" => 1})
661+
end
662+
end
663+
end
664+
665+
it "handles callable returning string" do
666+
with_server do |server|
667+
requests = Queue.new
668+
server.setup_response("/") do |req,res|
669+
requests << req
670+
send_stream_content(res, "", keep_open: true)
671+
end
672+
673+
callable_payload = -> { "dynamic-string-#{rand(1000)}" }
674+
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
675+
received_req = requests.pop
676+
expect(received_req.body).to match(/^dynamic-string-\d+$/)
677+
end
678+
end
679+
end
680+
681+
it "handles callable returning hash" do
682+
with_server do |server|
683+
requests = Queue.new
684+
server.setup_response("/") do |req,res|
685+
requests << req
686+
send_stream_content(res, "", keep_open: true)
687+
end
688+
689+
callable_payload = -> { {type: "dynamic", value: rand(1000)} }
690+
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
691+
received_req = requests.pop
692+
expect(received_req.header["content-type"].first).to include("application/json")
693+
parsed_body = JSON.parse(received_req.body)
694+
expect(parsed_body["type"]).to eq("dynamic")
695+
expect(parsed_body["value"]).to be_a(Integer)
696+
end
697+
end
698+
end
699+
700+
it "handles callable returning array" do
701+
with_server do |server|
702+
requests = Queue.new
703+
server.setup_response("/") do |req,res|
704+
requests << req
705+
send_stream_content(res, "", keep_open: true)
706+
end
707+
708+
callable_payload = -> { ["dynamic", Time.now.to_i] }
709+
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
710+
received_req = requests.pop
711+
expect(received_req.header["content-type"].first).to include("application/json")
712+
parsed_body = JSON.parse(received_req.body)
713+
expect(parsed_body[0]).to eq("dynamic")
714+
expect(parsed_body[1]).to be_a(Integer)
715+
end
716+
end
717+
end
718+
719+
it "handles callable returning other types by converting to string" do
720+
with_server do |server|
721+
requests = Queue.new
722+
server.setup_response("/") do |req,res|
723+
requests << req
724+
send_stream_content(res, "", keep_open: true)
725+
end
726+
727+
test_object = Object.new
728+
def test_object.to_s
729+
"custom-object-string"
730+
end
731+
732+
callable_payload = -> { test_object }
733+
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
734+
received_req = requests.pop
735+
expect(received_req.body).to eq("custom-object-string")
736+
end
737+
end
738+
end
739+
end
450740
end

0 commit comments

Comments
 (0)