diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b2160cc --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM ruby + +ENV HOME=/opt/fluent-plugin-splunk-http-eventcollector + +COPY Gemfile ${HOME}/Gemfile +COPY fluent-plugin-splunk-http-eventcollector.gemspec ${HOME}/fluent-plugin-splunk-http-eventcollector.gemspec + +WORKDIR ${HOME} + +RUN bundle install + +COPY . ${HOME} + +CMD ["rake", "test"] diff --git a/README.md b/README.md index cf34039..7675ee1 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,21 @@ Put the following lines to your fluent.conf: format kvp + # log files containing nested JSON + + type splunk-http-eventcollector + server splunk.example.com:8089 + all_items true + nested_json true + + + # log metadata in addition to the event + + type splunk-http-eventcollector + server splunk.example.com:8089 + fields { "is_test_log": true } + + ## Contributing 1. Fork it diff --git a/fluent-plugin-splunk-http-eventcollector.gemspec b/fluent-plugin-splunk-http-eventcollector.gemspec index 7b4c95d..177a81b 100644 --- a/fluent-plugin-splunk-http-eventcollector.gemspec +++ b/fluent-plugin-splunk-http-eventcollector.gemspec @@ -3,7 +3,7 @@ $:.push File.expand_path("../lib", __FILE__) Gem::Specification.new do |gem| gem.name = "fluent-plugin-splunk-http-eventcollector" - gem.version = "0.2.0" + gem.version = "0.4.1" gem.authors = ["Bryce Chidester"] gem.email = ["bryce.chidester@calyptix.com"] gem.summary = "Splunk output plugin for Fluentd" diff --git a/lib/fluent/plugin/out_splunk-http-eventcollector.rb b/lib/fluent/plugin/out_splunk-http-eventcollector.rb index 19080f6..25854e6 100644 --- a/lib/fluent/plugin/out_splunk-http-eventcollector.rb +++ b/lib/fluent/plugin/out_splunk-http-eventcollector.rb @@ -30,6 +30,8 @@ # http://dev.splunk.com/view/event-collector/SP-CAAAE6M # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector +require 'date' + module Fluent class SplunkHTTPEventcollectorOutput < BufferedOutput @@ -47,10 +49,13 @@ class SplunkHTTPEventcollectorOutput < BufferedOutput config_param :index, :string, :default => 'main' config_param :all_items, :bool, :default => false + config_param :iso8601_time, :string, :default => nil config_param :sourcetype, :string, :default => 'fluentd' config_param :source, :string, :default => nil config_param :post_retry_max, :integer, :default => 5 config_param :post_retry_interval, :integer, :default => 5 + config_param :nested_json, :bool, :default => false + config_param :fields, :hash, :default => {} # TODO Find better upper limits config_param :batch_size_limit, :integer, :default => 262144 # 65535 @@ -115,6 +120,12 @@ def configure(conf) @placeholder_expander = Fluent::SplunkHTTPEventcollectorOutput.placeholder_expander(log) @hostname = Socket.gethostname + + unless @fields.empty? + @fields = inject_env_vars_into_fields + @fields = inject_files_into_fields + end + # TODO Add other robust input/syntax checks. end # configure @@ -158,7 +169,7 @@ def format(tag, time, record) placeholders = @placeholder_expander.prepare_placeholders(placeholder_values) splunk_object = Hash[ - "time" => time.to_i, + "time" => handle_get_time(time, placeholders), "source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end, "sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders), "host" => @placeholder_expander.expand(@host.to_s, placeholders), @@ -171,10 +182,18 @@ def format(tag, time, record) splunk_object["event"] = convert_to_utf8(record["message"]) end + unless @fields.empty? + splunk_object["fields"] = @fields + end + json_event = splunk_object.to_json #log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}" #log.debug "format: returning: #{[tag, record].to_json.to_s}" - json_event + if @nested_json + json_event + "\n" + else + json_event + end end # By this point, fluentd has decided its buffer is full and it's time to flush @@ -190,11 +209,15 @@ def format(tag, time, record) def write(chunk) log.trace "splunk-http-eventcollector(write) called" - # Break the concatenated string of JSON-formatted events into an Array - split_chunk = chunk.read.split("}{").each do |x| - # Reconstruct the opening{/closing} that #split() strips off. - x.prepend("{") unless x.start_with?("{") - x << "}" unless x.end_with?("}") + if @nested_json + split_chunk = chunk.read.split("\n") + else + # Break the concatenated string of JSON-formatted events into an Array + split_chunk = chunk.read.split("}{").each do |x| + # Reconstruct the opening{/closing} that #split() strips off. + x.prepend("{") unless x.start_with?("{") + x << "}" unless x.end_with?("}") + end end log.debug "Pushing #{numfmt(split_chunk.size)} events (" + "#{numfmt(chunk.read.bytesize)} bytes) to Splunk." @@ -265,7 +288,7 @@ def push_buffer(body) next elsif response.code.match(/^40/) # user error - log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n#{response.body}" + log.error "#{@splunk_uri}: #{response.code} (#{response.message})\nReq: #{body}\nRes: #{response.body}" break elsif c < @post_retry_max # retry @@ -320,5 +343,35 @@ def convert_to_utf8(input) end end end + + # Environment variables are passed in with the following format: + # @{ENV['NAME_OF_ENV_VAR']} + def inject_env_vars_into_fields + @fields.each { | _, field_value| + match_data = field_value.to_s.match(/^@\{ENV\['(?.+)'\]\}$/) + if match_data && match_data["env_name"] + field_value.replace(ENV[match_data["env_name"]]) + end + } + end + + def inject_files_into_fields + @fields.each { | _, field_value | + match_data = field_value.to_s.match(/^@\{FILE\['(?.+)'\]\}$/) + if match_data && match_data["file_path"] + field_value.replace(IO.read(match_data["file_path"])) + end + } + end + + def handle_get_time(emitted_at_timestamp, placeholders) + if @iso8601_time.nil? + emitted_at_timestamp.to_f + else + time = @placeholder_expander.expand(@iso8601_time, placeholders) + DateTime.iso8601(time).to_time.to_f + end + end + end # class SplunkHTTPEventcollectorOutput end # module Fluent diff --git a/test/plugin/test_out_splunk-http-eventcollector.rb b/test/plugin/test_out_splunk-http-eventcollector.rb index d1feda7..389e22f 100644 --- a/test/plugin/test_out_splunk-http-eventcollector.rb +++ b/test/plugin/test_out_splunk-http-eventcollector.rb @@ -120,7 +120,7 @@ def test_write_splitting batch_size_limit 250 ]) - time = Time.parse("2010-01-02 13:14:15 UTC").to_i + time = Time.parse("2010-01-02 13:14:15 UTC").to_f d.emit({"message" => "a" }, time) d.emit({"message" => "b" }, time) d.emit({"message" => "c" }, time) @@ -176,4 +176,24 @@ def test_utf8 body: { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: { some: { nested: " f-8", with: [" "," ","f-8"]}}}, times: 1 end + + def test_write_fields + stub_request(:post, "https://localhost:8089/services/collector"). + with(headers: {"Authorization" => "Splunk changeme"}). + to_return(body: '{"text":"Success","code":0}') + + d = create_driver(CONFIG + %[ + fields { "cluster": "aws" } + source ${record["source"]} + ]) + + time = Time.parse("2010-01-02 13:14:15 UTC").to_i + d.emit({ "message" => "a message", "source" => "source-from-record"}, time) + d.run + + assert_requested :post, "https://localhost:8089/services/collector", + headers: {"Authorization" => "Splunk changeme"}, + body: { time: time, source: "source-from-record", sourcetype: "fluentd", host: "", index: "main", event: "a message", fields: { cluster: "aws" } }, + times: 1 + end end