diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..b2160cc
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,14 @@
+FROM ruby
+
+ENV HOME=/opt/fluent-plugin-splunk-http-eventcollector
+
+COPY Gemfile ${HOME}/Gemfile
+COPY fluent-plugin-splunk-http-eventcollector.gemspec ${HOME}/fluent-plugin-splunk-http-eventcollector.gemspec
+
+WORKDIR ${HOME}
+
+RUN bundle install
+
+COPY . ${HOME}
+
+CMD ["rake", "test"]
diff --git a/README.md b/README.md
index cf34039..7675ee1 100644
--- a/README.md
+++ b/README.md
@@ -197,6 +197,21 @@ Put the following lines to your fluent.conf:
format kvp
+ # log files containing nested JSON
+
+ type splunk-http-eventcollector
+ server splunk.example.com:8089
+ all_items true
+ nested_json true
+
+
+ # log metadata in addition to the event
+
+ type splunk-http-eventcollector
+ server splunk.example.com:8089
+ fields { "is_test_log": true }
+
+
## Contributing
1. Fork it
diff --git a/fluent-plugin-splunk-http-eventcollector.gemspec b/fluent-plugin-splunk-http-eventcollector.gemspec
index 7b4c95d..177a81b 100644
--- a/fluent-plugin-splunk-http-eventcollector.gemspec
+++ b/fluent-plugin-splunk-http-eventcollector.gemspec
@@ -3,7 +3,7 @@ $:.push File.expand_path("../lib", __FILE__)
Gem::Specification.new do |gem|
gem.name = "fluent-plugin-splunk-http-eventcollector"
- gem.version = "0.2.0"
+ gem.version = "0.4.1"
gem.authors = ["Bryce Chidester"]
gem.email = ["bryce.chidester@calyptix.com"]
gem.summary = "Splunk output plugin for Fluentd"
diff --git a/lib/fluent/plugin/out_splunk-http-eventcollector.rb b/lib/fluent/plugin/out_splunk-http-eventcollector.rb
index 19080f6..25854e6 100644
--- a/lib/fluent/plugin/out_splunk-http-eventcollector.rb
+++ b/lib/fluent/plugin/out_splunk-http-eventcollector.rb
@@ -30,6 +30,8 @@
# http://dev.splunk.com/view/event-collector/SP-CAAAE6M
# http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector
+require 'date'
+
module Fluent
class SplunkHTTPEventcollectorOutput < BufferedOutput
@@ -47,10 +49,13 @@ class SplunkHTTPEventcollectorOutput < BufferedOutput
config_param :index, :string, :default => 'main'
config_param :all_items, :bool, :default => false
+ config_param :iso8601_time, :string, :default => nil
config_param :sourcetype, :string, :default => 'fluentd'
config_param :source, :string, :default => nil
config_param :post_retry_max, :integer, :default => 5
config_param :post_retry_interval, :integer, :default => 5
+ config_param :nested_json, :bool, :default => false
+ config_param :fields, :hash, :default => {}
# TODO Find better upper limits
config_param :batch_size_limit, :integer, :default => 262144 # 65535
@@ -115,6 +120,12 @@ def configure(conf)
@placeholder_expander = Fluent::SplunkHTTPEventcollectorOutput.placeholder_expander(log)
@hostname = Socket.gethostname
+
+ unless @fields.empty?
+ @fields = inject_env_vars_into_fields
+ @fields = inject_files_into_fields
+ end
+
# TODO Add other robust input/syntax checks.
end # configure
@@ -158,7 +169,7 @@ def format(tag, time, record)
placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)
splunk_object = Hash[
- "time" => time.to_i,
+ "time" => handle_get_time(time, placeholders),
"source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end,
"sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders),
"host" => @placeholder_expander.expand(@host.to_s, placeholders),
@@ -171,10 +182,18 @@ def format(tag, time, record)
splunk_object["event"] = convert_to_utf8(record["message"])
end
+ unless @fields.empty?
+ splunk_object["fields"] = @fields
+ end
+
json_event = splunk_object.to_json
#log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}"
#log.debug "format: returning: #{[tag, record].to_json.to_s}"
- json_event
+ if @nested_json
+ json_event + "\n"
+ else
+ json_event
+ end
end
# By this point, fluentd has decided its buffer is full and it's time to flush
@@ -190,11 +209,15 @@ def format(tag, time, record)
def write(chunk)
log.trace "splunk-http-eventcollector(write) called"
- # Break the concatenated string of JSON-formatted events into an Array
- split_chunk = chunk.read.split("}{").each do |x|
- # Reconstruct the opening{/closing} that #split() strips off.
- x.prepend("{") unless x.start_with?("{")
- x << "}" unless x.end_with?("}")
+ if @nested_json
+ split_chunk = chunk.read.split("\n")
+ else
+ # Break the concatenated string of JSON-formatted events into an Array
+ split_chunk = chunk.read.split("}{").each do |x|
+ # Reconstruct the opening{/closing} that #split() strips off.
+ x.prepend("{") unless x.start_with?("{")
+ x << "}" unless x.end_with?("}")
+ end
end
log.debug "Pushing #{numfmt(split_chunk.size)} events (" +
"#{numfmt(chunk.read.bytesize)} bytes) to Splunk."
@@ -265,7 +288,7 @@ def push_buffer(body)
next
elsif response.code.match(/^40/)
# user error
- log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n#{response.body}"
+ log.error "#{@splunk_uri}: #{response.code} (#{response.message})\nReq: #{body}\nRes: #{response.body}"
break
elsif c < @post_retry_max
# retry
@@ -320,5 +343,35 @@ def convert_to_utf8(input)
end
end
end
+
+ # Environment variables are passed in with the following format:
+ # @{ENV['NAME_OF_ENV_VAR']}
+ def inject_env_vars_into_fields
+ @fields.each { | _, field_value|
+ match_data = field_value.to_s.match(/^@\{ENV\['(?.+)'\]\}$/)
+ if match_data && match_data["env_name"]
+ field_value.replace(ENV[match_data["env_name"]])
+ end
+ }
+ end
+
+ def inject_files_into_fields
+ @fields.each { | _, field_value |
+ match_data = field_value.to_s.match(/^@\{FILE\['(?.+)'\]\}$/)
+ if match_data && match_data["file_path"]
+ field_value.replace(IO.read(match_data["file_path"]))
+ end
+ }
+ end
+
+ def handle_get_time(emitted_at_timestamp, placeholders)
+ if @iso8601_time.nil?
+ emitted_at_timestamp.to_f
+ else
+ time = @placeholder_expander.expand(@iso8601_time, placeholders)
+ DateTime.iso8601(time).to_time.to_f
+ end
+ end
+
end # class SplunkHTTPEventcollectorOutput
end # module Fluent
diff --git a/test/plugin/test_out_splunk-http-eventcollector.rb b/test/plugin/test_out_splunk-http-eventcollector.rb
index d1feda7..389e22f 100644
--- a/test/plugin/test_out_splunk-http-eventcollector.rb
+++ b/test/plugin/test_out_splunk-http-eventcollector.rb
@@ -120,7 +120,7 @@ def test_write_splitting
batch_size_limit 250
])
- time = Time.parse("2010-01-02 13:14:15 UTC").to_i
+ time = Time.parse("2010-01-02 13:14:15 UTC").to_f
d.emit({"message" => "a" }, time)
d.emit({"message" => "b" }, time)
d.emit({"message" => "c" }, time)
@@ -176,4 +176,24 @@ def test_utf8
body: { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: { some: { nested: " f-8", with: [" "," ","f-8"]}}},
times: 1
end
+
+ def test_write_fields
+ stub_request(:post, "https://localhost:8089/services/collector").
+ with(headers: {"Authorization" => "Splunk changeme"}).
+ to_return(body: '{"text":"Success","code":0}')
+
+ d = create_driver(CONFIG + %[
+ fields { "cluster": "aws" }
+ source ${record["source"]}
+ ])
+
+ time = Time.parse("2010-01-02 13:14:15 UTC").to_i
+ d.emit({ "message" => "a message", "source" => "source-from-record"}, time)
+ d.run
+
+ assert_requested :post, "https://localhost:8089/services/collector",
+ headers: {"Authorization" => "Splunk changeme"},
+ body: { time: time, source: "source-from-record", sourcetype: "fluentd", host: "", index: "main", event: "a message", fields: { cluster: "aws" } },
+ times: 1
+ end
end