diff --git a/README.md b/README.md index 396026b..222e177 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ fluentd >= 0.14.0 exchange foo # required: name of exchange exchange_type fanout # required: type of exchange e.g. topic, direct exchange_durable false - routing_key hoge # if not specified, the tag is used + routing_key hoge # if not specified, the tag is used. You can use placeholders like ${tag}, ${time}, ${record["key"]} here. heartbeat 10 # integer as seconds or :server (interval specified by server) @type json # or msgpack, ltsv, none diff --git a/lib/fluent/plugin/out_rabbitmq.rb b/lib/fluent/plugin/out_rabbitmq.rb index 30e0d53..91a4f4d 100644 --- a/lib/fluent/plugin/out_rabbitmq.rb +++ b/lib/fluent/plugin/out_rabbitmq.rb @@ -20,7 +20,10 @@ module Fluent::Plugin class RabbitMQOutput < Output Fluent::Plugin.register_output("rabbitmq", self) - helpers :formatter, :inject, :compat_parameters + helpers :formatter, :inject, :compat_parameters, :event_emitter + + PLACEHOLDER_RECORD = /\$\{record\["([^"]+)"\]\}/.freeze + PLACEHOLDER_REGEX = /\$\{[^}]+\}/.freeze config_section :format do config_set_default :@type, "json" @@ -110,6 +113,8 @@ def configure(conf) @publish_options[:app_id] = @app_id if @app_id @formatter = formatter_create(default_type: @type) + + @use_evaluated_routing_key = @routing_key && @routing_key.match?(PLACEHOLDER_REGEX) end def multi_workers_ready? @@ -137,12 +142,11 @@ def shutdown super end - def set_publish_options(tag, time, record) + def set_publish_options(tag, time, record, chunk=nil) + rk = resolve_routing_key(tag, time, record, chunk) + @publish_options[:timestamp] = time.to_i if @timestamp - - if @exchange_type != "fanout" - @publish_options[:routing_key] = @routing_key || tag - end + @publish_options[:routing_key] = rk if @exchange_type != "fanout" if @id_key id = record[@id_key] @@ -163,11 +167,29 @@ def write(chunk) tag = chunk.metadata.tag chunk.each do |time, record| - set_publish_options(tag, time, record) + set_publish_options(tag, time, record, chunk) record = inject_values_to_record(tag, time, record) buf = @formatter.format(tag, time, record) @bunny_exchange.publish(buf, @publish_options) end end + + def resolve_routing_key(tag, time, record, chunk=nil) + return @routing_key || tag unless @use_evaluated_routing_key + + if chunk + extract_placeholders(@routing_key, chunk) + else + expand_placeholders_runtime(@routing_key, tag, time, record) + end + end + + def expand_placeholders_runtime(str, tag, time, record) + s = str.dup + s.gsub!('${tag}', tag.to_s) + s.gsub!('${time}', time.to_i.to_s) + s.gsub!(PLACEHOLDER_RECORD) { record[$1] ? record[$1].to_s : "" } + s + end end end diff --git a/test/plugin/test_out_rabbitmq.rb b/test/plugin/test_out_rabbitmq.rb index bb15972..b0dc3fb 100644 --- a/test/plugin/test_out_rabbitmq.rb +++ b/test/plugin/test_out_rabbitmq.rb @@ -209,4 +209,24 @@ def test_buffered_emit assert_equal(record, JSON.parse(body)) assert_equal(@time, properties[:timestamp].to_i) end + + def test_dynamic_routing_key_from_record + d = create_driver(%[ + exchange test_out_dynamic + exchange_type topic + routing_key test_out_dynamic.${record["build_id"]} + ]) + + queue = @channel.queue("test_out_dynamic_123") + topic_exchange = Bunny::Exchange.new(@channel, "topic", "test_out_dynamic") + queue.bind(topic_exchange, routing_key: "test_out_dynamic.123") + + record = {"build_id" => "123", "msg" => "hello dynamic"} + d.run(default_tag: "app.build") do + d.feed(@time, record) + end + + _, _, body = queue.pop + assert_equal(record, JSON.parse(body)) + end end