From 9af56863ce25b15400cd0feacefb1e9065082a3c Mon Sep 17 00:00:00 2001 From: Bryce Chidester Date: Fri, 21 Apr 2017 01:44:18 -0700 Subject: [PATCH] Throw the project through rubocop Cleanup where possible, disable cops that aren't important. --- .rubocop.yml | 36 + Gemfile | 5 +- Rakefile | 7 +- ...-plugin-splunk-http-eventcollector.gemspec | 47 +- .../plugin/out_splunk-http-eventcollector.rb | 635 +++++++++--------- test/helper.rb | 8 +- .../test_out_splunk-http-eventcollector.rb | 225 ++++--- 7 files changed, 528 insertions(+), 435 deletions(-) create mode 100644 .rubocop.yml diff --git a/.rubocop.yml b/.rubocop.yml new file mode 100644 index 0000000..6fca9d6 --- /dev/null +++ b/.rubocop.yml @@ -0,0 +1,36 @@ +# Sure, these are nice but disable them for now. +Metrics/ClassLength: + Enabled: false + +Metrics/MethodLength: + Enabled: false + +Metrics/AbcSize: + Enabled: false + +Metrics/PerceivedComplexity: + Enabled: false + +Metrics/CyclomaticComplexity: + Enabled: false + +Style/FileName: + Enabled: false + +Style/GuardClause: + Enabled: false + +Style/MethodName: + Enabled: false + +Style/ClassAndModuleChildren: + Enabled: false + +Style/Documentation: + Enabled: false + +Style/MethodMissing: + Enabled: false + +Style/GlobalVars: + Enabled: false diff --git a/Gemfile b/Gemfile index ab130ad..d3f1dcd 100644 --- a/Gemfile +++ b/Gemfile @@ -1,6 +1,7 @@ source 'https://rubygems.org' -# Specify your gem's dependencies in fluent-plugin-splunk-http-eventcollector.gemspec +# Specify your gem's dependencies in +# fluent-plugin-splunk-http-eventcollector.gemspec gemspec -#gem "test-unit" +# gem "test-unit" diff --git a/Rakefile b/Rakefile index 0c77a5b..aac7776 100644 --- a/Rakefile +++ b/Rakefile @@ -1,4 +1,4 @@ -require "bundler/gem_tasks" +require 'bundler/gem_tasks' require 'rake/testtask' Rake::TestTask.new(:test) do |test| @@ -7,4 +7,7 @@ Rake::TestTask.new(:test) do |test| test.verbose = true end -task :default => :test +require 'rubocop/rake_task' +RuboCop::RakeTask.new + +task default: %i[rubocop test] diff --git a/fluent-plugin-splunk-http-eventcollector.gemspec b/fluent-plugin-splunk-http-eventcollector.gemspec index 7b4c95d..eb1ad20 100644 --- a/fluent-plugin-splunk-http-eventcollector.gemspec +++ b/fluent-plugin-splunk-http-eventcollector.gemspec @@ -1,28 +1,31 @@ # -*- encoding: utf-8 -*- -$:.push File.expand_path("../lib", __FILE__) + +$LOAD_PATH.push File.expand_path('../lib', __FILE__) Gem::Specification.new do |gem| - gem.name = "fluent-plugin-splunk-http-eventcollector" - gem.version = "0.2.0" - gem.authors = ["Bryce Chidester"] - gem.email = ["bryce.chidester@calyptix.com"] - gem.summary = "Splunk output plugin for Fluentd" - gem.description = "Splunk output plugin (HTTP Event Collector) for Fluentd event collector" - gem.homepage = "https://github.com/brycied00d/fluent-plugin-splunk-http-eventcollector" + gem.name = 'fluent-plugin-splunk-http-eventcollector' + gem.version = '0.2.0' + gem.authors = ['Bryce Chidester'] + gem.email = ['bryce.chidester@calyptix.com'] + gem.summary = 'Splunk output plugin for Fluentd' + gem.description = 'Splunk output plugin (HTTP Event Collector) for '\ + 'Fluentd event collector' + gem.homepage = 'https://github.com/brycied00d/fluent-plugin-splunk-http-eventcollector' gem.license = 'BSD-2-Clause' - gem.extra_rdoc_files = [ "LICENSE", "README.md" ] - gem.files = [ ".gitignore", "Gemfile", "LICENSE", "README.md", - "Rakefile", "test/helper.rb", - "fluent-plugin-splunk-http-eventcollector.gemspec", - "lib/fluent/plugin/out_splunk-http-eventcollector.rb", - "test/plugin/test_out_splunk-http-eventcollector.rb" ] - gem.test_files = [ "test/helper.rb", - "test/plugin/test_out_splunk-http-eventcollector.rb" ] - gem.require_paths = ["lib"] + gem.extra_rdoc_files = ['LICENSE', 'README.md'] + gem.files = ['.gitignore', 'Gemfile', 'LICENSE', 'README.md', + 'Rakefile', 'test/helper.rb', + 'fluent-plugin-splunk-http-eventcollector.gemspec', + 'lib/fluent/plugin/out_splunk-http-eventcollector.rb', + 'test/plugin/test_out_splunk-http-eventcollector.rb'] + gem.test_files = ['test/helper.rb', + 'test/plugin/test_out_splunk-http-eventcollector.rb'] + gem.require_paths = ['lib'] - gem.add_development_dependency "rake" - gem.add_development_dependency "test-unit", '~> 3.1' - gem.add_development_dependency "webmock", '~> 2.3', '>= 2.3.2' - gem.add_runtime_dependency "fluentd", '~> 0.12.12' - gem.add_runtime_dependency "net-http-persistent", '~> 2.9' + gem.add_development_dependency 'rake' + gem.add_development_dependency 'rubocop' + gem.add_development_dependency 'test-unit', '~> 3.1' + gem.add_development_dependency 'webmock', '~> 2.3', '>= 2.3.2' + gem.add_runtime_dependency 'fluentd', '~> 0.12.12' + gem.add_runtime_dependency 'net-http-persistent', '~> 2.9' end diff --git a/lib/fluent/plugin/out_splunk-http-eventcollector.rb b/lib/fluent/plugin/out_splunk-http-eventcollector.rb index ff59dda..b80dc9d 100644 --- a/lib/fluent/plugin/out_splunk-http-eventcollector.rb +++ b/lib/fluent/plugin/out_splunk-http-eventcollector.rb @@ -1,324 +1,345 @@ -=begin - -Copyright (c) 2015, Bryce Chidester (Calyptix Security) -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -=end +# +# Copyright (c) 2015, Bryce Chidester (Calyptix Security) +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# # Splunk HTTP Event collector docs # http://dev.splunk.com/view/event-collector/SP-CAAAE6M # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector module Fluent -class SplunkHTTPEventcollectorOutput < BufferedOutput - - Plugin.register_output('splunk-http-eventcollector', self) - - config_param :test_mode, :bool, :default => false - - config_param :server, :string, :default => 'localhost:8088' - config_param :verify, :bool, :default => true - config_param :token, :string, :default => nil - - # Event parameters - config_param :protocol, :string, :default => 'https' - config_param :host, :string, :default => nil - config_param :index, :string, :default => 'main' - config_param :all_items, :bool, :default => false - - config_param :sourcetype, :string, :default => 'fluentd' - config_param :source, :string, :default => nil - config_param :post_retry_max, :integer, :default => 5 - config_param :post_retry_interval, :integer, :default => 5 - - # TODO Find better upper limits - config_param :batch_size_limit, :integer, :default => 262144 # 65535 - #config_param :batch_event_limit, :integer, :default => 100 - - # Whether to allow non-UTF-8 characters in user logs. If set to true, any - # non-UTF-8 character would be replaced by the string specified by - # 'non_utf8_replacement_string'. If set to false, any non-UTF-8 character - # would trigger the plugin to error out. - config_param :coerce_to_utf8, :bool, :default => true - - # If 'coerce_to_utf8' is set to true, any non-UTF-8 character would be - # replaced by the string specified here. - config_param :non_utf8_replacement_string, :string, :default => ' ' - - # Called on class load (class initializer) - def initialize - super - log.trace "splunk-http-eventcollector(initialize) called" - require 'net/http/persistent' - require 'openssl' - end # initialize - - # Thanks to - # https://github.com/kazegusuri/fluent-plugin-prometheus/blob/348c112d/lib/fluent/plugin/prometheus.rb - def self.placeholder_expander(log) - # Use internal class in order to expand placeholder - if defined?(Fluent::Filter) # for v0.12, built-in PlaceholderExpander - begin - require 'fluent/plugin/filter_record_transformer' - if defined?(Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander) - # for v0.14 - return Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander.new(log: log) - else - # for v0.12 - return Fluent::RecordTransformerFilter::PlaceholderExpander.new(log: log) - end - rescue LoadError => e - raise ConfigError, "cannot find filter_record_transformer plugin: #{e.message}" + class SplunkHTTPEventcollectorOutput < BufferedOutput + Plugin.register_output('splunk-http-eventcollector', self) + + config_param :test_mode, :bool, default: false + + config_param :server, :string, default: 'localhost:8088' + config_param :verify, :bool, default: true + config_param :token, :string, default: nil + + # Event parameters + config_param :protocol, :string, default: 'https' + config_param :host, :string, default: nil + config_param :index, :string, default: 'main' + config_param :all_items, :bool, default: false + + config_param :sourcetype, :string, default: 'fluentd' + config_param :source, :string, default: nil + config_param :post_retry_max, :integer, default: 5 + config_param :post_retry_interval, :integer, default: 5 + + # TODO: Find better upper limits + config_param :batch_size_limit, :integer, default: 262_144 # 65535 + # config_param :batch_event_limit, :integer, :default => 100 + + # Whether to allow non-UTF-8 characters in user logs. If set to true, any + # non-UTF-8 character would be replaced by the string specified by + # 'non_utf8_replacement_string'. If set to false, any non-UTF-8 character + # would trigger the plugin to error out. + config_param :coerce_to_utf8, :bool, default: true + + # If 'coerce_to_utf8' is set to true, any non-UTF-8 character would be + # replaced by the string specified here. + config_param :non_utf8_replacement_string, :string, default: ' ' + + # Called on class load (class initializer) + def initialize + super + log.trace { 'splunk-http-eventcollector(initialize) called'.freeze } + require 'net/http/persistent' + require 'openssl' + end # initialize + + # Thanks to + # https://github.com/kazegusuri/fluent-plugin-prometheus/blob/348c112d/lib/fluent/plugin/prometheus.rb + def self.placeholder_expander(log) + # Use internal class in order to expand placeholder + + require 'fluent/plugin/filter_record_transformer' + if defined? Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander + # for v0.14 + return Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander + .new(log: log) + else + # for v0.12 + return Fluent::RecordTransformerFilter::PlaceholderExpander + .new(log: log) end - else # for v0.10, use PlaceholderExapander in fluent-plugin-record-reformer plugin + rescue LoadError => e + raise ConfigError, 'cannot find filter_record_transformer plugin: '\ + "#{e.message}".freeze + end + + ## This method is called before starting. + ## 'conf' is a Hash that includes configuration parameters. + ## If the configuration is invalid, raise Fluent::ConfigError. + def configure(conf) + super + log.trace { 'splunk-http-eventcollector(configure) called'.freeze } begin - require 'fluent/plugin/out_record_reformer.rb' - return Fluent::RecordReformerOutput::PlaceholderExpander.new(log: log) - rescue LoadError => e - raise ConfigError, "cannot find fluent-plugin-record-reformer: #{e.message}" + @splunk_uri = URI "#{@protocol}://#{@server}/services/collector".freeze + rescue + raise ConfigError, 'Unable to parse the server into a URI.'.freeze end - end - end - - ## This method is called before starting. - ## 'conf' is a Hash that includes configuration parameters. - ## If the configuration is invalid, raise Fluent::ConfigError. - def configure(conf) - super - log.trace "splunk-http-eventcollector(configure) called" - begin - @splunk_uri = URI "#{@protocol}://#{@server}/services/collector" - rescue - raise ConfigError, "Unable to parse the server into a URI." - end - @placeholder_expander = Fluent::SplunkHTTPEventcollectorOutput.placeholder_expander(log) - @hostname = Socket.gethostname - # TODO Add other robust input/syntax checks. - end # configure - - ## This method is called when starting. - ## Open sockets or files here. - def start - super - log.trace "splunk-http-eventcollector(start) called" - @http = Net::HTTP::Persistent.new 'fluent-plugin-splunk-http-eventcollector' - @http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless @verify - @http.override_headers['Content-Type'] = 'application/json' - @http.override_headers['User-Agent'] = 'fluent-plugin-splunk-http-eventcollector/0.0.1' - @http.override_headers['Authorization'] = "Splunk #{@token}" - - log.trace "initialized for splunk-http-eventcollector" - end - - ## This method is called when shutting down. - ## Shutdown the thread and close sockets or files here. - def shutdown - super - log.trace "splunk-http-eventcollector(shutdown) called" - - @http.shutdown - log.trace "shutdown from splunk-http-eventcollector" - end # shutdown - - ## This method is called when an event reaches to Fluentd. (like unbuffered emit()) - ## Convert the event to a raw string. - def format(tag, time, record) - #log.trace "splunk-http-eventcollector(format) called" - # Basic object for Splunk. Note explicit type-casting to avoid accidental errors. - - placeholder_values = { - 'tag' => tag, - 'tag_parts' => tag.split('.'), - 'hostname' => @hostname, - 'record' => record - } - - placeholders = @placeholder_expander.prepare_placeholders(placeholder_values) - - splunk_object = Hash[ - "time" => time.to_i, - "source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end, - "sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders), - "host" => @host.to_s, - "index" => @placeholder_expander.expand(@index, placeholders) - ] - # TODO: parse different source types as expected: KVP, JSON, TEXT - if @all_items - splunk_object["event"] = convert_to_utf8(record) - else - splunk_object["event"] = convert_to_utf8(record["message"]) + @placeholder_expander = Fluent::SplunkHTTPEventcollectorOutput + .placeholder_expander(log) + @hostname = Socket.gethostname + # TODO: Add other robust input/syntax checks. + end # configure + + ## This method is called when starting. + ## Open sockets or files here. + def start + super + log.trace { 'splunk-http-eventcollector(start) called'.freeze } + @http = Net::HTTP::Persistent.new 'fluent-plugin-splunk-http-'\ + 'eventcollector'.freeze + @http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless @verify + @http.override_headers['Content-Type'] = 'application/json'.freeze + @http.override_headers['User-Agent'] = 'fluent-plugin-splunk-http-'\ + 'eventcollector/0.2.0'.freeze + @http.override_headers['Authorization'] = "Splunk #{@token}".freeze + + log.trace { 'initialized for splunk-http-eventcollector'.freeze } end - json_event = splunk_object.to_json - #log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}" - #log.debug "format: returning: #{[tag, record].to_json.to_s}" - json_event - end - - # By this point, fluentd has decided its buffer is full and it's time to flush - # it. chunk.read is a concatenated string of JSON.to_s objects. Simply POST - # them to Splunk and go about our life. - ## This method is called every flush interval. Write the buffer chunk - ## to files or databases here. - ## 'chunk' is a buffer chunk that includes multiple formatted - ## events. You can use 'data = chunk.read' to get all events and - ## 'chunk.open {|io| ... }' to get IO objects. - ## - ## NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins. - def write(chunk) - log.trace "splunk-http-eventcollector(write) called" - - # Break the concatenated string of JSON-formatted events into an Array - split_chunk = chunk.read.split("}{").each do |x| - # Reconstruct the opening{/closing} that #split() strips off. - x.prepend("{") unless x.start_with?("{") - x << "}" unless x.end_with?("}") - end - log.debug "Pushing #{numfmt(split_chunk.size)} events (" + - "#{numfmt(chunk.read.bytesize)} bytes) to Splunk." - # If fluentd is pushing too much data to Splunk at once, split up the payload - # Don't care about the number of events so much as the POST size (bytes) - #if split_chunk.size > @batch_event_limit - # log.warn "Fluentd is attempting to push #{numfmt(split_chunk.size)} " + - # "events in a single push to Splunk. The configured limit is " + - # "#{numfmt(@batch_event_limit)}." - #end - if chunk.read.bytesize > @batch_size_limit - log.warn "Fluentd is attempting to push #{numfmt(chunk.read.bytesize)} " + - "bytes in a single push to Splunk. The configured limit is " + - "#{numfmt(@batch_size_limit)} bytes." - newbuffer = Array.new - split_chunk_counter = 0 - split_chunk.each do |c| - split_chunk_counter = split_chunk_counter + 1 - #log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " + - # "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " + - # "c.bytesize=#{numfmt(c.bytesize)} ????" - if newbuffer.join.bytesize + c.bytesize < @batch_size_limit - #log.debug "Appended!" - newbuffer << c - else - # Reached the limit - push the current newbuffer.join, and reset - #log.debug "Would exceed limit. Flushing newbuffer and continuing." - log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " + - "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " + - "c.bytesize=#{numfmt(c.bytesize)} > #{numfmt(@batch_size_limit)}, " + - "flushing current buffer to Splunk." - push_buffer newbuffer.join - newbuffer = Array c - end # if/else buffer fits limit - end # split_chunk.each - # Push anything left over. - push_buffer newbuffer.join if newbuffer.size - return - else - return push_buffer chunk.read - end # if chunk.read.bytesize > @batch_size_limit - end # write - - def push_buffer(body) - post = Net::HTTP::Post.new @splunk_uri.request_uri - post.body = body - log.debug "POST #{@splunk_uri}" - if @test_mode - log.debug "TEST_MODE Payload: #{body}" - return - end - # retry up to :post_retry_max times - 1.upto(@post_retry_max) do |c| - response = @http.request @splunk_uri, post - log.debug "=>(#{c}/#{numfmt(@post_retry_max)}) #{response.code} " + - "(#{response.message})" - # TODO check the actual server response too (it's JSON) - if response.code == "200" # and... - # success - break - # TODO check 40X response within post_retry_max and retry - elsif response.code.match(/^50/) and c < @post_retry_max - # retry - log.warn "#{@splunk_uri}: Server error #{response.code} (" + - "#{response.message}). Retrying in #{@post_retry_interval} " + - "seconds.\n#{response.body}" - sleep @post_retry_interval - next - elsif response.code.match(/^40/) - # user error - log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n#{response.body}" - break - elsif c < @post_retry_max - # retry - log.debug "#{@splunk_uri}: Retrying..." - sleep @post_retry_interval - next + ## This method is called when shutting down. + ## Shutdown the thread and close sockets or files here. + def shutdown + super + log.trace { 'splunk-http-eventcollector(shutdown) called'.freeze } + + @http.shutdown + log.trace { 'shutdown from splunk-http-eventcollector'.freeze } + end # shutdown + + # This method is called when an event reaches to Fluentd. (like + # unbuffered emit()) + # Convert the event to a raw string. + def format(tag, time, record) + # log.trace { 'splunk-http-eventcollector(format) called'.freeze } + placeholder_values = { + 'tag' => tag, + 'tag_parts' => tag.split('.'), + 'hostname' => @hostname, + 'record' => record + } + + placeholders = @placeholder_expander + .prepare_placeholders(placeholder_values) + + # Basic object for Splunk. Note explicit type-casting to avoid accidental + # errors. + splunk_object = { + 'time' => time.to_i, + 'host' => @host.to_s + } + splunk_object['sourcetype'] = @placeholder_expander + .expand(@sourcetype, placeholders) + + splunk_object['index'] = @placeholder_expander + .expand(@index, placeholders) + + if @source.nil? + splunk_object['source'] = tag.to_s else - # other errors. fluentd will retry processing on exception - # FIXME: this may duplicate logs when using multiple buffers - raise "#{@splunk_uri}: #{response.message}\n#{response.body}" - end # If response.code - end # 1.upto(@post_retry_max) - end # push_buffer - - def numfmt(input) - input.to_s.reverse.gsub(/(\d{3})(?=\d)/, '\1,').reverse - end # numfmt - - # Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any - # non-UTF-8 character would be replaced by the string specified by - # 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any - # non-UTF-8 character would trigger the plugin to error out. - # Thanks to - # https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284 - def convert_to_utf8(input) - if input.is_a?(Hash) - record = {} - input.each do |key, value| - record[convert_to_utf8(key)] = convert_to_utf8(value) + splunk_object['source'] = @placeholder_expander + .expand(@source, placeholders) end - return record - end - return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array) - return input unless input.respond_to?(:encode) - - if @coerce_to_utf8 - input.encode( - 'utf-8', - invalid: :replace, - undef: :replace, - replace: @non_utf8_replacement_string) - else - begin - input.encode('utf-8') - rescue EncodingError - @log.error 'Encountered encoding issues potentially due to non ' \ - 'UTF-8 characters. To allow non-UTF-8 characters and ' \ - 'replace them with spaces, please set "coerce_to_utf8" ' \ - 'to true.' - raise + # TODO: parse different source types as expected: KVP, JSON, TEXT + if @all_items + splunk_object['event'] = convert_to_utf8(record) + else + splunk_object['event'] = convert_to_utf8(record['message']) end + + json_event = splunk_object.to_json.freeze + # log.debug do + # "Generated JSON(#{json_event.class}): #{json_event}"\ + # "format: returning: #{[tag, record].to_json}".freeze + # end + json_event end - end -end # class SplunkHTTPEventcollectorOutput -end # module Fluent + + # By this point, fluentd has decided its buffer is full and it's time to + # flush it. chunk.read is a concatenated string of JSON.to_s objects. Simply + # POST them to Splunk and go about our life. + # This method is called every flush interval. Write the buffer chunk + # to files or databases here. + # 'chunk' is a buffer chunk that includes multiple formatted + # events. You can use 'data = chunk.read' to get all events and + # 'chunk.open {|io| ... }' to get IO objects. + # + # NOTE! This method is called by internal thread, not Fluentd's main thread. + # This means that IO wait doesn't affect other plugins. + def write(chunk) + log.trace { 'splunk-http-eventcollector(write) called' } + + # Break the concatenated string of JSON-formatted events into an Array + split_chunk = chunk.read.split('}{').each do |x| + # Reconstruct the opening{/closing} that #split() strips off. + x.prepend('{') unless x.start_with? '{' + x << '}' unless x.end_with? '}' + end + log.debug do + "Pushing #{numfmt(split_chunk.size)} events (" \ + "#{numfmt(chunk.read.bytesize)} bytes) to Splunk.".freeze + end + # If fluentd is pushing too much data to Splunk at once, split up the + # payload. Don't care about the number of events so much as the POST + # size (bytes) + # if split_chunk.size > @batch_event_limit + # log.warn "Fluentd is attempting to push #{numfmt(split_chunk.size)} " + + # "events in a single push to Splunk. The configured limit is " + + # "#{numfmt(@batch_event_limit)}." + # end + if chunk.read.bytesize > @batch_size_limit + log.warn "Fluentd is attempting to push #{numfmt(chunk.read.bytesize)}"\ + ' bytes in a single push to Splunk. The configured limit is '\ + "#{numfmt(@batch_size_limit)} bytes.".freeze + newbuffer = [] + split_chunk_counter = 0 + split_chunk.each do |c| + split_chunk_counter += 1 + # log.debug do + # "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) "\ + # "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + "\ + # "c.bytesize=#{numfmt(c.bytesize)} ????".freeze + # end + if newbuffer.join.bytesize + c.bytesize < @batch_size_limit + # log.debug {"Appended!"} + newbuffer << c + else + # Reached the limit - push the current newbuffer.join, and reset + # log.debug do + # 'Would exceed limit. Flushing newbuffer and continuing.' + # end + log.debug do + "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " \ + "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " \ + "c.bytesize=#{numfmt(c.bytesize)} > #{numfmt(@batch_size_limit)}"\ + ', flushing current buffer to Splunk.'.freeze + end + push_buffer newbuffer.join + newbuffer = Array c + end # if/else buffer fits limit + end # split_chunk.each + # Push anything left over. + push_buffer newbuffer.join if newbuffer.size + return + else + return push_buffer chunk.read + end # if chunk.read.bytesize > @batch_size_limit + end # write + + def push_buffer(body) + post = Net::HTTP::Post.new @splunk_uri.request_uri + post.body = body + log.debug { "POST #{@splunk_uri}".freeze } + if @test_mode + log.debug { "TEST_MODE Payload: #{body}".freeze } + return + end + # retry up to :post_retry_max times + 1.upto(@post_retry_max) do |c| + response = @http.request @splunk_uri, post + log.debug do + "=>(#{c}/#{numfmt(@post_retry_max)}) #{response.code} " \ + "(#{response.message})".freeze + end + # TODO: check the actual server response too (it's JSON) + if response.code == '200' # and... + # success + break + # TODO: check 40X response within post_retry_max and retry + elsif response.code.match(/^50/) && c < @post_retry_max + # retry + log.warn "#{@splunk_uri}: Server error #{response.code} ("\ + "#{response.message}). Retrying in #{@post_retry_interval} "\ + "seconds.\n#{response.body}".freeze + sleep @post_retry_interval + next + elsif response.code =~ /^40/ + # user error + log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n"\ + "#{response.body}".freeze + break + elsif c < @post_retry_max + # retry + log.debug { "#{@splunk_uri}: Retrying...".freeze } + sleep @post_retry_interval + next + else + # other errors. fluentd will retry processing on exception + # FIXME: this may duplicate logs when using multiple buffers + raise "#{@splunk_uri}: #{response.message}\n#{response.body}".freeze + end # If response.code + end # 1.upto(@post_retry_max) + end # push_buffer + + def numfmt(input) + input.to_s.reverse.gsub(/(\d{3})(?=\d)/, '\1,').reverse + end # numfmt + + # Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any + # non-UTF-8 character would be replaced by the string specified by + # 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any + # non-UTF-8 character would trigger the plugin to error out. + # Thanks to + # https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284 + def convert_to_utf8(input) + if input.is_a? Hash + record = {} + input.each do |key, value| + record[convert_to_utf8(key)] = convert_to_utf8(value) + end + + return record + end + return input.map { |value| convert_to_utf8(value) } if input.is_a? Array + return input unless input.respond_to? :encode + + if @coerce_to_utf8 + input.encode( + 'utf-8'.freeze, + invalid: :replace, + undef: :replace, + replace: @non_utf8_replacement_string + ) + else + begin + input.encode('utf-8') + rescue EncodingError + @log.error 'Encountered encoding issues potentially due to non ' \ + 'UTF-8 characters. To allow non-UTF-8 characters and ' \ + 'replace them with spaces, please set "coerce_to_utf8" ' \ + 'to true.'.freeze + raise + end + end + end # convert_to_utf8 + end # class SplunkHTTPEventcollectorOutput +end # module Fluent diff --git a/test/helper.rb b/test/helper.rb index 03094c1..3b89e9d 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -4,7 +4,7 @@ Bundler.setup(:default, :development) rescue Bundler::BundlerError => e $stderr.puts e.message - $stderr.puts "Run `bundle install` to install missing gems" + $stderr.puts 'Run `bundle install` to install missing gems' exit e.status_code end require 'test/unit' @@ -12,13 +12,13 @@ $LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) $LOAD_PATH.unshift(File.dirname(__FILE__)) require 'fluent/test' -unless ENV.has_key?('VERBOSE') +unless ENV.key?('VERBOSE') nulllogger = Object.new - nulllogger.instance_eval {|obj| + nulllogger.instance_eval do |_obj| def method_missing(method, *args) # pass end - } + end $log = nulllogger end diff --git a/test/plugin/test_out_splunk-http-eventcollector.rb b/test/plugin/test_out_splunk-http-eventcollector.rb index d1feda7..5cd1d4b 100644 --- a/test/plugin/test_out_splunk-http-eventcollector.rb +++ b/test/plugin/test_out_splunk-http-eventcollector.rb @@ -5,14 +5,16 @@ def setup Fluent::Test.setup end - CONFIG = %[ + CONFIG = %( server localhost:8089 verify false token changeme - ] + ).freeze - def create_driver(conf=CONFIG, tag='test') - Fluent::Test::BufferedOutputTestDriver.new(Fluent::SplunkHTTPEventcollectorOutput, tag).configure(conf) + def create_driver(conf = CONFIG, tag = 'test') + Fluent::Test::BufferedOutputTestDriver + .new(Fluent::SplunkHTTPEventcollectorOutput, tag) + .configure(conf) end def test_configure @@ -23,68 +25,85 @@ def test_configure end def test_write - stub_request(:post, "https://localhost:8089/services/collector"). - to_return(body: '{"text":"Success","code":0}') + stub_request(:post, 'https://localhost:8089/services/collector') + .to_return(body: '{"text":"Success","code":0}') d = create_driver - time = Time.parse("2010-01-02 13:14:15 UTC").to_i - d.emit({ "message" => "a message"}, time) + time = Time.parse('2010-01-02 13:14:15 UTC').to_i + d.emit({ 'message' => 'a message' }, time) d.run - assert_requested :post, "https://localhost:8089/services/collector", - headers: { - "Authorization" => "Splunk changeme", - 'Content-Type' => 'application/json', - 'User-Agent' => 'fluent-plugin-splunk-http-eventcollector/0.0.1' - }, - body: { time: time, source:"test", sourcetype: "fluentd", host: "", index: "main", event: "a message" }, - times: 1 + assert_requested :post, 'https://localhost:8089/services/collector', + headers: { + 'Authorization' => 'Splunk changeme', + 'Content-Type' => 'application/json', + 'User-Agent' => 'fluent-plugin-splunk-http-'\ + 'eventcollector/0.2.0' + }, + body: { time: time, + source: 'test', + sourcetype: 'fluentd', + host: '', + index: 'main', + event: 'a message' }, + times: 1 end def test_expand - stub_request(:post, "https://localhost:8089/services/collector"). - to_return(body: '{"text":"Success","code":0}') + stub_request(:post, 'https://localhost:8089/services/collector') + .to_return(body: '{"text":"Success","code":0}') - d = create_driver(CONFIG + %[ + d = create_driver(CONFIG + %( source ${record["source"]} sourcetype ${tag_parts[0]} - ]) + )) - time = Time.parse("2010-01-02 13:14:15 UTC").to_i - d.emit({"message" => "a message", "source" => "source-from-record"}, time) + time = Time.parse('2010-01-02 13:14:15 UTC').to_i + d.emit({ 'message' => 'a message', 'source' => 'source-from-record' }, time) d.run - assert_requested :post, "https://localhost:8089/services/collector", - headers: {"Authorization" => "Splunk changeme"}, - body: { time: time, source: "source-from-record", sourcetype: "test", host: "", index: "main", event: "a message" }, - times: 1 + assert_requested :post, 'https://localhost:8089/services/collector', + headers: { 'Authorization' => 'Splunk changeme' }, + body: { time: time, + source: 'source-from-record', + sourcetype: 'test', + host: '', + index: 'main', + event: 'a message' }, + times: 1 end def test_4XX_error_retry - stub_request(:post, "https://localhost:8089/services/collector"). - with(headers: {"Authorization" => "Splunk changeme"}). - to_return(body: '{"text":"Incorrect data format","code":5,"invalid-event-number":0}', status: 400) + stub_request(:post, 'https://localhost:8089/services/collector') + .with(headers: { 'Authorization' => 'Splunk changeme' }) + .to_return(body: '{"text":"Incorrect data format","code":5,'\ + '"invalid-event-number":0}', status: 400) d = create_driver - time = Time.parse("2010-01-02 13:14:15 UTC").to_i - d.emit({ "message" => "1" }, time) + time = Time.parse('2010-01-02 13:14:15 UTC').to_i + d.emit({ 'message' => '1' }, time) d.run - assert_requested :post, "https://localhost:8089/services/collector", - headers: {"Authorization" => "Splunk changeme"}, - body: { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: "1" }, - times: 1 + assert_requested :post, 'https://localhost:8089/services/collector', + headers: { 'Authorization' => 'Splunk changeme' }, + body: { time: time, + source: 'test', + sourcetype: 'fluentd', + host: '', + index: 'main', + event: '1' }, + times: 1 end def test_5XX_error_retry request_count = 0 - stub_request(:post, "https://localhost:8089/services/collector"). - with(headers: {"Authorization" => "Splunk changeme"}). - to_return do |request| + stub_request(:post, 'https://localhost:8089/services/collector') + .with(headers: { 'Authorization' => 'Splunk changeme' }) + .to_return do |_request| request_count += 1 if request_count < 5 @@ -94,86 +113,96 @@ def test_5XX_error_retry end end - - d = create_driver(CONFIG + %[ + d = create_driver(CONFIG + %( post_retry_max 5 post_retry_interval 0.1 - ]) + )) - time = Time.parse("2010-01-02 13:14:15 UTC").to_i - d.emit({ "message" => "1" }, time) + time = Time.parse('2010-01-02 13:14:15 UTC').to_i + d.emit({ 'message' => '1' }, time) d.run - assert_requested :post, "https://localhost:8089/services/collector", - headers: {"Authorization" => "Splunk changeme"}, - body: { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: "1" }, - times: 5 + assert_requested :post, 'https://localhost:8089/services/collector', + headers: { 'Authorization' => 'Splunk changeme' }, + body: { time: time, + source: 'test', + sourcetype: 'fluentd', + host: '', + index: 'main', + event: '1' }, + times: 5 end def test_write_splitting - stub_request(:post, "https://localhost:8089/services/collector"). - with(headers: {"Authorization" => "Splunk changeme"}). - to_return(body: '{"text":"Incorrect data format","code":5,"invalid-event-number":0}', status: 400) + stub_request(:post, 'https://localhost:8089/services/collector') + .with(headers: { 'Authorization' => 'Splunk changeme' }) + .to_return(body: '{"text":"Incorrect data format","code":5,'\ + '"invalid-event-number":0}', status: 400) # A single msg is ~110 bytes - d = create_driver(CONFIG + %[ + d = create_driver(CONFIG + %( batch_size_limit 250 - ]) - - time = Time.parse("2010-01-02 13:14:15 UTC").to_i - d.emit({"message" => "a" }, time) - d.emit({"message" => "b" }, time) - d.emit({"message" => "c" }, time) - d.run - - assert_requested :post, "https://localhost:8089/services/collector", - headers: {"Authorization" => "Splunk changeme"}, - body: - { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: "a" }.to_json + - { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: "b" }.to_json, - times: 1 - assert_requested :post, "https://localhost:8089/services/collector", - headers: {"Authorization" => "Splunk changeme"}, - body: { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: "c" }.to_json, - times: 1 - assert_requested :post, "https://localhost:8089/services/collector", times: 2 - end + )) - def test_utf8 - stub_request(:post, "https://localhost:8089/services/collector"). - with(headers: {"Authorization" => "Splunk changeme"}). - to_return(body: '{"text":"Success","code":0}') - - d = create_driver(CONFIG + %[ - all_items true - ]) - - time = Time.parse("2010-01-02 13:14:15 UTC").to_i - d.emit({ "some" => { "nested" => "ü†f-8".force_encoding("BINARY"), "with" => ['ü', '†', 'f-8'].map {|c| c.force_encoding("BINARY") } } }, time) + time = Time.parse('2010-01-02 13:14:15 UTC').to_i + d.emit({ 'message' => 'a' }, time) + d.emit({ 'message' => 'b' }, time) + d.emit({ 'message' => 'c' }, time) d.run - assert_requested :post, "https://localhost:8089/services/collector", - headers: {"Authorization" => "Splunk changeme"}, - body: { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: { some: { nested: " f-8", with: [" "," ","f-8"]}}}, - times: 1 + assert_requested :post, 'https://localhost:8089/services/collector', + headers: { 'Authorization' => 'Splunk changeme' }, + body: '{"time":' + time.to_s + ',"host":"","sourcetype":"'\ + 'fluentd","index":"main","source":"test","event":'\ + '"a"}{"time":' + time.to_s + ',"host":"",'\ + '"sourcetype":"fluentd","index":"main","source":'\ + '"test","event":"b"}', + times: 1 + assert_requested :post, 'https://localhost:8089/services/collector', + headers: { 'Authorization' => 'Splunk changeme' }, + body: { time: time, + source: 'test', + sourcetype: 'fluentd', + host: '', + index: 'main', + event: 'c' }, + times: 1 + assert_requested :post, 'https://localhost:8089/services/collector', + times: 2 end def test_utf8 - stub_request(:post, "https://localhost:8089/services/collector"). - with(headers: {"Authorization" => "Splunk changeme"}). - to_return(body: '{"text":"Success","code":0}') + stub_request(:post, 'https://localhost:8089/services/collector') + .with(headers: { 'Authorization' => 'Splunk changeme' }) + .to_return(body: '{"text":"Success","code":0}') - d = create_driver(CONFIG + %[ + d = create_driver(CONFIG + %( all_items true - ]) - - time = Time.parse("2010-01-02 13:14:15 UTC").to_i - d.emit({ "some" => { "nested" => "ü†f-8".force_encoding("BINARY"), "with" => ['ü', '†', 'f-8'].map {|c| c.force_encoding("BINARY") } } }, time) + )) + + time = Time.parse('2010-01-02 13:14:15 UTC').to_i + d.emit({ + 'some' => { + 'nested' => 'ü†f-8'.force_encoding('BINARY'), + 'with' => ['ü', '†', 'f-8'] + .map { |c| c.force_encoding('BINARY') } + } + }, time) d.run - assert_requested :post, "https://localhost:8089/services/collector", - headers: {"Authorization" => "Splunk changeme"}, - body: { time: time, source: "test", sourcetype: "fluentd", host: "", index: "main", event: { some: { nested: " f-8", with: [" "," ","f-8"]}}}, - times: 1 + assert_requested :post, 'https://localhost:8089/services/collector', + headers: { 'Authorization' => 'Splunk changeme' }, + body: { time: time, + source: 'test', + sourcetype: 'fluentd', + host: '', + index: 'main', + event: { + some: { + nested: ' f-8', + with: [' ', ' ', 'f-8'] + } + } }, + times: 1 end end