Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fluentd >= 0.14.0
exchange foo # required: name of exchange
exchange_type fanout # required: type of exchange e.g. topic, direct
exchange_durable false
routing_key hoge # if not specified, the tag is used
routing_key hoge # if not specified, the tag is used. You can use placeholders like ${tag}, ${time}, ${record["key"]} here.
heartbeat 10 # integer as seconds or :server (interval specified by server)
<format>
@type json # or msgpack, ltsv, none
Expand Down
36 changes: 29 additions & 7 deletions lib/fluent/plugin/out_rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ module Fluent::Plugin
class RabbitMQOutput < Output
Fluent::Plugin.register_output("rabbitmq", self)

helpers :formatter, :inject, :compat_parameters
helpers :formatter, :inject, :compat_parameters, :event_emitter

PLACEHOLDER_RECORD = /\$\{record\["([^"]+)"\]\}/.freeze
PLACEHOLDER_REGEX = /\$\{[^}]+\}/.freeze

config_section :format do
config_set_default :@type, "json"
Expand Down Expand Up @@ -110,6 +113,8 @@ def configure(conf)
@publish_options[:app_id] = @app_id if @app_id

@formatter = formatter_create(default_type: @type)

@use_evaluated_routing_key = @routing_key && @routing_key.match?(PLACEHOLDER_REGEX)
end

def multi_workers_ready?
Expand Down Expand Up @@ -137,12 +142,11 @@ def shutdown
super
end

def set_publish_options(tag, time, record)
def set_publish_options(tag, time, record, chunk=nil)
rk = resolve_routing_key(tag, time, record, chunk)

@publish_options[:timestamp] = time.to_i if @timestamp

if @exchange_type != "fanout"
@publish_options[:routing_key] = @routing_key || tag
end
@publish_options[:routing_key] = rk if @exchange_type != "fanout"

if @id_key
id = record[@id_key]
Expand All @@ -163,11 +167,29 @@ def write(chunk)
tag = chunk.metadata.tag

chunk.each do |time, record|
set_publish_options(tag, time, record)
set_publish_options(tag, time, record, chunk)
record = inject_values_to_record(tag, time, record)
buf = @formatter.format(tag, time, record)
@bunny_exchange.publish(buf, @publish_options)
end
end

def resolve_routing_key(tag, time, record, chunk=nil)
return @routing_key || tag unless @use_evaluated_routing_key

if chunk
extract_placeholders(@routing_key, chunk)
else
expand_placeholders_runtime(@routing_key, tag, time, record)
end
end

def expand_placeholders_runtime(str, tag, time, record)
s = str.dup
s.gsub!('${tag}', tag.to_s)
s.gsub!('${time}', time.to_i.to_s)
s.gsub!(PLACEHOLDER_RECORD) { record[$1] ? record[$1].to_s : "" }
s
end
end
end
20 changes: 20 additions & 0 deletions test/plugin/test_out_rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,24 @@ def test_buffered_emit
assert_equal(record, JSON.parse(body))
assert_equal(@time, properties[:timestamp].to_i)
end

def test_dynamic_routing_key_from_record
d = create_driver(%[
exchange test_out_dynamic
exchange_type topic
routing_key test_out_dynamic.${record["build_id"]}
])

queue = @channel.queue("test_out_dynamic_123")
topic_exchange = Bunny::Exchange.new(@channel, "topic", "test_out_dynamic")
queue.bind(topic_exchange, routing_key: "test_out_dynamic.123")

record = {"build_id" => "123", "msg" => "hello dynamic"}
d.run(default_tag: "app.build") do
d.feed(@time, record)
end

_, _, body = queue.pop
assert_equal(record, JSON.parse(body))
end
end