From e8e46fa5b6e70a395516d00d9dd450907c606f46 Mon Sep 17 00:00:00 2001 From: Nam Pham Date: Sun, 9 Nov 2025 13:53:56 +0900 Subject: [PATCH 1/4] dynamic routing key --- lib/fluent/plugin/out_rabbitmq.rb | 38 +++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/out_rabbitmq.rb b/lib/fluent/plugin/out_rabbitmq.rb index 30e0d53..0c6fd7f 100644 --- a/lib/fluent/plugin/out_rabbitmq.rb +++ b/lib/fluent/plugin/out_rabbitmq.rb @@ -20,7 +20,10 @@ module Fluent::Plugin class RabbitMQOutput < Output Fluent::Plugin.register_output("rabbitmq", self) - helpers :formatter, :inject, :compat_parameters + helpers :formatter, :inject, :compat_parameters, :event_emitter + + PLACEHOLDER_RECORD = /\$\{record\["([^"]+)"\]\}/.freeze + PLACEHOLDER_REGEX = /\$\{[^}]+\}/.freeze config_section :format do config_set_default :@type, "json" @@ -110,6 +113,10 @@ def configure(conf) @publish_options[:app_id] = @app_id if @app_id @formatter = formatter_create(default_type: @type) + + @use_evaluated_routing_key = @routing_key && @routing_key.match?(PLACEHOLDER_REGEX) + + #put "[INFO] RabbitMQ Output plugin configured. routing_key=#{routing_key}, use_evaluated_routing_key=#{use_evaluated_routing_key}" end def multi_workers_ready? @@ -137,12 +144,11 @@ def shutdown super end - def set_publish_options(tag, time, record) + def set_publish_options(tag, time, record, chunk=nil) + rk = resolve_routing_key(tag, time, record, chunk) + @publish_options[:timestamp] = time.to_i if @timestamp - - if @exchange_type != "fanout" - @publish_options[:routing_key] = @routing_key || tag - end + @publish_options[:routing_key] = rk if @exchange_type != "fanout" if @id_key id = record[@id_key] @@ -163,11 +169,29 @@ def write(chunk) tag = chunk.metadata.tag chunk.each do |time, record| - set_publish_options(tag, time, record) + set_publish_options(tag, time, record, chunk) record = inject_values_to_record(tag, time, record) buf = @formatter.format(tag, time, record) @bunny_exchange.publish(buf, @publish_options) end end + + def resolve_routing_key(tag, time, record, chunk=nil) + return @routing_key || tag unless @use_evaluated_routing_key + + if chunk + extract_placeholders(@routing_key, chunk) + else + expand_placeholders_runtime(@routing_key, tag, time, record) + end + end + + def expand_placeholders_runtime(str, tag, time, record) + s = str.dup + s.gsub!('${tag}', tag.to_s) + s.gsub!('${time}', time.to_i.to_s) + s.gsub!(PLACEHOLDER_RECORD) { record[$1] ? record[$1].to_s : "" } + s + end end end From af7c0e0d86b3258c323faad3fe2db3f6e96a1c22 Mon Sep 17 00:00:00 2001 From: Nam Pham Date: Sun, 9 Nov 2025 13:58:35 +0900 Subject: [PATCH 2/4] add test --- test/plugin/test_out_rabbitmq.rb | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/test/plugin/test_out_rabbitmq.rb b/test/plugin/test_out_rabbitmq.rb index bb15972..b0dc3fb 100644 --- a/test/plugin/test_out_rabbitmq.rb +++ b/test/plugin/test_out_rabbitmq.rb @@ -209,4 +209,24 @@ def test_buffered_emit assert_equal(record, JSON.parse(body)) assert_equal(@time, properties[:timestamp].to_i) end + + def test_dynamic_routing_key_from_record + d = create_driver(%[ + exchange test_out_dynamic + exchange_type topic + routing_key test_out_dynamic.${record["build_id"]} + ]) + + queue = @channel.queue("test_out_dynamic_123") + topic_exchange = Bunny::Exchange.new(@channel, "topic", "test_out_dynamic") + queue.bind(topic_exchange, routing_key: "test_out_dynamic.123") + + record = {"build_id" => "123", "msg" => "hello dynamic"} + d.run(default_tag: "app.build") do + d.feed(@time, record) + end + + _, _, body = queue.pop + assert_equal(record, JSON.parse(body)) + end end From 3ff9c2ba34c9b22ea52ba7f43555a1a716038f80 Mon Sep 17 00:00:00 2001 From: Nam Pham Date: Sun, 9 Nov 2025 14:05:45 +0900 Subject: [PATCH 3/4] Update README to clarify usage of placeholders in routing_key configuration --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 396026b..222e177 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ fluentd >= 0.14.0 exchange foo # required: name of exchange exchange_type fanout # required: type of exchange e.g. topic, direct exchange_durable false - routing_key hoge # if not specified, the tag is used + routing_key hoge # if not specified, the tag is used. You can use placeholders like ${tag}, ${time}, ${record["key"]} here. heartbeat 10 # integer as seconds or :server (interval specified by server) @type json # or msgpack, ltsv, none From 4b2f13922a81bdb8b1c680738648f5cb57f25ffb Mon Sep 17 00:00:00 2001 From: Nam Pham Date: Sun, 9 Nov 2025 14:09:31 +0900 Subject: [PATCH 4/4] remove comment --- lib/fluent/plugin/out_rabbitmq.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/fluent/plugin/out_rabbitmq.rb b/lib/fluent/plugin/out_rabbitmq.rb index 0c6fd7f..91a4f4d 100644 --- a/lib/fluent/plugin/out_rabbitmq.rb +++ b/lib/fluent/plugin/out_rabbitmq.rb @@ -115,8 +115,6 @@ def configure(conf) @formatter = formatter_create(default_type: @type) @use_evaluated_routing_key = @routing_key && @routing_key.match?(PLACEHOLDER_REGEX) - - #put "[INFO] RabbitMQ Output plugin configured. routing_key=#{routing_key}, use_evaluated_routing_key=#{use_evaluated_routing_key}" end def multi_workers_ready?