Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions fluent-plugin-out-http.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

Gem::Specification.new do |gem|
gem.name = "fluent-plugin-out-http"
gem.version = "1.3.4"
gem.authors = ["Marica Odagaki"]
gem.email = ["ento.entotto@gmail.com"]
gem.version = "2.0.0"
gem.authors = ["Matthew Cleveland"]
gem.email = ["mgcleveland@gmail.com"]
gem.summary = %q{A generic Fluentd output plugin to send logs to an HTTP endpoint}
gem.description = gem.summary
gem.homepage = "https://github.com/fluent-plugins-nursery/fluent-plugin-out-http"
Expand Down
20 changes: 17 additions & 3 deletions lib/fluent/plugin/out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def initialize
# Compress with gzip except for form serializer
config_param :compress_request, :bool, :default => false

config_param :enable_chunk_time, :bool, default: false

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
config_set_default :chunk_keys, ['tag']
Expand Down Expand Up @@ -117,7 +119,7 @@ def shutdown
end

def format_url(tag, time, record)
@endpoint_url
@current_chunk_endpoint || @endpoint_url
end

def set_body(req, tag, time, record)
Expand Down Expand Up @@ -300,10 +302,20 @@ def process(tag, es)
end

def write(chunk)
endpoint = @endpoint_url.dup
if @enable_chunk_time && chunk.metadata && chunk.metadata.respond_to?(:timekey) && chunk.metadata.timekey
chunk_time = Time.at(chunk.metadata.timekey).utc
formatted_time = chunk_time.strftime('%Y%m%d%H%M%S')
# Replace the placeholder with formatted time
endpoint = endpoint.gsub('CHUNK_TIME_PLACEHOLDER', formatted_time)
end

uri = URI.parse(endpoint)

tag = chunk.metadata.tag
@endpoint_url = extract_placeholders(@endpoint_url, chunk)
@current_chunk_endpoint = endpoint

log.debug { "#{@http_method.capitalize} data to #{@endpoint_url} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" }
log.debug { "#{@http_method.capitalize} data to #{@current_chunk_endpoint} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" }

if @bulk_request
time = Fluent::Engine.now
Expand All @@ -313,5 +325,7 @@ def write(chunk)
handle_record(tag, time, record)
end
end

@current_chunk_endpoint = nil
end
end