diff --git a/fluent-plugin-out-http.gemspec b/fluent-plugin-out-http.gemspec index 62762d1..5aceb8d 100644 --- a/fluent-plugin-out-http.gemspec +++ b/fluent-plugin-out-http.gemspec @@ -2,9 +2,9 @@ Gem::Specification.new do |gem| gem.name = "fluent-plugin-out-http" - gem.version = "1.3.4" - gem.authors = ["Marica Odagaki"] - gem.email = ["ento.entotto@gmail.com"] + gem.version = "2.0.0" + gem.authors = ["Matthew Cleveland"] + gem.email = ["mgcleveland@gmail.com"] gem.summary = %q{A generic Fluentd output plugin to send logs to an HTTP endpoint} gem.description = gem.summary gem.homepage = "https://github.com/fluent-plugins-nursery/fluent-plugin-out-http" diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 0056e4a..8dcdb09 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -68,6 +68,8 @@ def initialize # Compress with gzip except for form serializer config_param :compress_request, :bool, :default => false + config_param :enable_chunk_time, :bool, default: false + config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE config_set_default :chunk_keys, ['tag'] @@ -117,7 +119,7 @@ def shutdown end def format_url(tag, time, record) - @endpoint_url + @current_chunk_endpoint || @endpoint_url end def set_body(req, tag, time, record) @@ -300,10 +302,20 @@ def process(tag, es) end def write(chunk) + endpoint = @endpoint_url.dup + if @enable_chunk_time && chunk.metadata && chunk.metadata.respond_to?(:timekey) && chunk.metadata.timekey + chunk_time = Time.at(chunk.metadata.timekey).utc + formatted_time = chunk_time.strftime('%Y%m%d%H%M%S') + # Replace the placeholder with formatted time + endpoint = endpoint.gsub('CHUNK_TIME_PLACEHOLDER', formatted_time) + end + + uri = URI.parse(endpoint) + tag = chunk.metadata.tag - @endpoint_url = extract_placeholders(@endpoint_url, chunk) + @current_chunk_endpoint = endpoint - log.debug { "#{@http_method.capitalize} data to #{@endpoint_url} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } + log.debug { "#{@http_method.capitalize} data to #{@current_chunk_endpoint} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } if @bulk_request time = Fluent::Engine.now @@ -313,5 +325,7 @@ def write(chunk) handle_record(tag, time, record) end end + + @current_chunk_endpoint = nil end end