From 8b83d1f5494e943a01140ddd892d413a0364f848 Mon Sep 17 00:00:00 2001 From: Matt Cleveland Date: Tue, 30 Sep 2025 14:06:28 -0400 Subject: [PATCH] Add support for including the chunk timestamp in request parameters Adds a new enable_chunk_time configuration boolean parameter. When set to true, any endpoint_url with CHUNK_TIME_PLACEHOLDER will have that string literal replaced with the timestamp at which the buffer chunk was created. This information may be useful to upstream recipients of the HTTP request to gain insight on the age of the data contained in the payload. --- fluent-plugin-out-http.gemspec | 6 +++--- lib/fluent/plugin/out_http.rb | 20 +++++++++++++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/fluent-plugin-out-http.gemspec b/fluent-plugin-out-http.gemspec index 62762d1..5aceb8d 100644 --- a/fluent-plugin-out-http.gemspec +++ b/fluent-plugin-out-http.gemspec @@ -2,9 +2,9 @@ Gem::Specification.new do |gem| gem.name = "fluent-plugin-out-http" - gem.version = "1.3.4" - gem.authors = ["Marica Odagaki"] - gem.email = ["ento.entotto@gmail.com"] + gem.version = "2.0.0" + gem.authors = ["Matthew Cleveland"] + gem.email = ["mgcleveland@gmail.com"] gem.summary = %q{A generic Fluentd output plugin to send logs to an HTTP endpoint} gem.description = gem.summary gem.homepage = "https://github.com/fluent-plugins-nursery/fluent-plugin-out-http" diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb index 0056e4a..8dcdb09 100644 --- a/lib/fluent/plugin/out_http.rb +++ b/lib/fluent/plugin/out_http.rb @@ -68,6 +68,8 @@ def initialize # Compress with gzip except for form serializer config_param :compress_request, :bool, :default => false + config_param :enable_chunk_time, :bool, default: false + config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE config_set_default :chunk_keys, ['tag'] @@ -117,7 +119,7 @@ def shutdown end def format_url(tag, time, record) - @endpoint_url + @current_chunk_endpoint || @endpoint_url end def set_body(req, tag, time, record) @@ -300,10 +302,20 @@ def process(tag, es) end def write(chunk) + endpoint = @endpoint_url.dup + if @enable_chunk_time && chunk.metadata && chunk.metadata.respond_to?(:timekey) && chunk.metadata.timekey + chunk_time = Time.at(chunk.metadata.timekey).utc + formatted_time = chunk_time.strftime('%Y%m%d%H%M%S') + # Replace the placeholder with formatted time + endpoint = endpoint.gsub('CHUNK_TIME_PLACEHOLDER', formatted_time) + end + + uri = URI.parse(endpoint) + tag = chunk.metadata.tag - @endpoint_url = extract_placeholders(@endpoint_url, chunk) + @current_chunk_endpoint = endpoint - log.debug { "#{@http_method.capitalize} data to #{@endpoint_url} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } + log.debug { "#{@http_method.capitalize} data to #{@current_chunk_endpoint} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } if @bulk_request time = Fluent::Engine.now @@ -313,5 +325,7 @@ def write(chunk) handle_record(tag, time, record) end end + + @current_chunk_endpoint = nil end end