Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ This plugin will help to gathering status log from these status api.
* [Monitor Status Code](#monitoring-http-status-code-only)
* [Override User Agent](#override-user-agent)
* [HTTP Basic Auth](#http-basic-auth)
* [HTTP Login with Payload](#http-login-with-payload)
* [HTTP Pull with Payload](#http-pull-with-payload)
* [Multiple Events](#multiple-events)
* [Encapsulated items](#encapsulated-items)
* [HTTP Proxy](#http-proxy)
* [Logging Response Header](#logging-http-response-header)
* [Custom Request Header](#custom-request-header)
Expand Down Expand Up @@ -161,6 +165,95 @@ You can use `user`, `password` options to provide authentication information.
# 2017-05-17 21:41:47.872951000 +0900 status: {"url":"http://yourinfrastructure/api/status.json","status":200,"message":{ ... }}
```

### HTTP login with payload

If your infrastructure use cookies to manage the login session, and the login path is different to query path, you can use `login_path`, `login_payload`, and `path` options to provide authentication information.

For example, login url is https://localhost:8080/login, query url is https://localhost:8080/search

```
<source>
@type http_pull

tag status
url https://localhost:8080
path search
interval 1s

login_path login
login_payload {"username":"tester","password":"drowssaP"}
verify_ssl false

format json
</source>
```

### HTTP pull with payload

You can send json format `payload` to togather with the query.

```
<source>
@type http_pull

tag status
url https://localhost:8080/search
payload {"max-results": 1500}

interval 1s
</source>
```

### Multiple events

If the server returns multiple events per request, for example

```
[{"message": "message 1"},
{"message": "message 2"}]
```
This can be handled by specify option `multi_event true`

```
<source>
@type http_pull

tag status
url https://localhost:8080/search
multi_event true

interval 1s
</source>
```

### Encapsulated items

If the expected items are encapsulated in json structure, for example,

```
{"meta": {"server": "localhost"},
"items": [
{"message": "message 1"},
{"message": "message 2"}
]
}
```

You can fetch the messages by setting up option `event_key`,

```
<source>
@type http_pull

tag status
url https://localhost:8080/search
multi_event true
event_key items

interval 1s
</source>
```

### HTTP proxy

You can send your requests via proxy server.
Expand Down Expand Up @@ -344,6 +437,31 @@ The user for basic auth

The password for basic auth

### Authentication Session Configuration

#### login_path (string) (optional, default: nil)

The subpath of the login url.

#### login_payload (hash) (optional, default: nil)

The payload send for authentication.
Note: login_path and login_payload has to be both nil or both not-nil.

### Request Configuration

#### payload (hash) (optional, default: nil)

The query payload sent to server.

#### multi_event (bool) (optional, default: false)

Whether the response contains multiple events.

#### event_key (string) (optional, default: nil)

The key of the expected items in a json format response.

### Req/Resp Header Configuration

#### response_header (section) (optional, default: nil)
Expand Down
121 changes: 99 additions & 22 deletions lib/fluent/plugin/in_http_pull.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require "fluent/log"
require "fluent/plugin/input"
require "rest-client"

Expand All @@ -33,6 +34,18 @@ def initialize
desc 'The url of monitoring target'
config_param :url, :string

desc 'The path of monitoring target'
config_param :path, :string, default: nil

desc 'Payload to query target'
config_param :payload, :hash, default: nil

desc 'Message key'
config_param :event_key, :string, default: nil

desc 'Response contains multiple events'
config_param :multi_event, :bool, default: false

desc 'The interval time between periodic request'
config_param :interval, :time

Expand All @@ -59,6 +72,13 @@ def initialize
desc 'password of basic auth'
config_param :password, :string, default: nil, secret: true

# login session
desc 'login path'
config_param :login_path, :string, default: nil

desc 'login payload'
config_param :login_payload, :hash, default: nil

# req/res header options
config_section :response_header, param_name: :response_headers, multi: true do
desc 'The name of header to cature from response'
Expand Down Expand Up @@ -88,6 +108,9 @@ def configure(conf)
compat_parameters_convert(conf, :parser)
super

if (@login_path && !@login_payload ) || (!@login_path && @login_payload)
raise Fluent::ConfigError, "login_path and login_payload should be both set or unset"
end
@parser = parser_create unless @status_only
@_request_headers = {
"Content-Type" => "application/x-www-form-urlencoded",
Expand All @@ -100,6 +123,7 @@ def configure(conf)
end.to_h)

@http_method = :head if @status_only
@cookies = {}
end

def start
Expand All @@ -111,32 +135,56 @@ def start
def on_timer
body = nil
record = nil

record_time = Engine.now
emit_stream = Fluent::MultiEventStream.new
base_site = RestClient::Resource.new(@url, request_options)
login_site = @login_path ? base_site[@login_path] : base_site
query_site = @path ? base_site[@path] : base_site
begin
res = RestClient::Request.execute request_options
record, body = get_record(res)

get_session_cookie(login_site)
rescue StandardError => err
record = { "url" => @url, "error" => err.message }
if err.respond_to? :http_code
record["status"] = err.http_code || 0
record = log_error(login_site.url, err)
emit_stream.add(record_time, record)
router.emit_stream(@tag, emit_stream)
return
end
begin
if @payload
res = query_site.method(@http_method).call(@payload.to_json, :cookies=>@cookies)
else
record["status"] = 0
res = query_site.method(@http_method).call(:cookie=>@cookies)
end

record, body = get_record(res, query_site.url)
process_events(record, body, emit_stream)
rescue StandardError => err
record = log_error(query_site.url, err)
# reset session cookie if it is expired
@cookies = {} if not @cookies.empty? and record["status"] == 401
emit_stream.add(record_time, record)
end

record_time = Engine.now
record = parse(record, body)
router.emit(@tag, record_time, record)
router.emit_stream(@tag, emit_stream)
end

def shutdown
super
end

private
def log_error(url, err)
record = { "url" => url, "error" => err.message }
if err.respond_to? :http_code
record["status"] = err.http_code || 0
else
record["status"] = 0
end
log.error(record)
return record
end

def request_options
options = { method: @http_method, url: @url, timeout: @timeout, headers: @_request_headers }
options = { timeout: @timeout, headers: @_request_headers }

options[:proxy] = @proxy if @proxy
options[:user] = @user if @user
Expand All @@ -151,30 +199,59 @@ def request_options
return options
end

def get_record(response)
def get_session_cookie(resource)
@cookies = {} if @cookies.nil?
return unless @login_path and @login_payload and @cookies.empty?
login_response = resource.post(@login_payload.to_json)
if login_response.code != 200
raise RestClient::ExceptionWithResponse.new(nil, login_response.code)
else
@cookies = login_response.cookie_jar
end

end

def get_record(response, url)
body = response.body
record = { "url" => @url, "status" => response.code }
record = { "url" => url, "status" => response.code }
record["header"] = {} unless @response_headers.empty?
@response_headers.each do |section|
name = section["header"]
symbolize_name = name.downcase.gsub(/-/, '_').to_sym

record["header"][name] = response.headers[symbolize_name]
end

return record, body
end

def parse(record, body)
if !@status_only && body != nil
@parser.parse(body) do |time, message|
record["message"] = message
record_time = time
def process_events(record, body, es)

return es.add(Engine.now, record) if @status_only or body == nil

# consume errors produced by parser by logging it
begin
@parser.parse(body) do |time, events|

events = events[@event_key] if @multi_event and @event_key || []

# if @event_key not found, events will be converted to empty Array.
log.warning("event_key '#{@event_key}' not found") if events == []

# if each query result is a record, covert it to array.
events = [ events ] unless @multi_event

events.each do |event|
item = record.dup
item['message'] = event
es.add(time, item)
end
end
rescue StandardError => err
log.error("Failed to process result with error: #{err}")
log.debug("Failed to parse #{body}")
end

return record
end

end
end
end
22 changes: 22 additions & 0 deletions test/helper/stub_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ def initialize(port=3939, ssl_enable=false)
@server.mount_proc '/custom_header', &method(:custom_header)

@server.mount_proc '/method_post', &method(:method_post)

@server.mount_proc '/login', &method(:login)
@server.mount_proc '/session_events', &method(:session_events)

@server.mount '/method_delete', DeleteService

end

def start
Expand Down Expand Up @@ -131,4 +136,21 @@ def method_post(req, res)
res.body = '{ "status": "OK" }'
end
end

def login(req, res)
if req.body and JSON.parse(req.body) == {"username"=>"admin", "password"=>"pwd"}
res.status = 200
res['Content-Type'] = 'application/json'
res.cookies.push WEBrick::Cookie.new("session", "1")
else
res.status = 401
end
end

def session_events(req, res)
res.status = 200
res['Content-Type'] = 'application/json'
res.body = '{"ListMeta":{},"items":[{"kind":"Event","meta":{"name":"1","uuid":"c51d9e82"}},{"kind":"Event","meta":{"name":"2","uuid":"b1b5686d"}}]}'
end

end
2 changes: 1 addition & 1 deletion test/plugin/test_in_http_pull.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
require "helper"
require "fluent/plugin/in_http_pull.rb"
require "fluent/plugin/in_http_pull"

require 'ostruct'

Expand Down
1 change: 1 addition & 0 deletions test/plugin/test_in_http_pull_auth.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class HttpPullInputTestAuth < Test::Unit::TestCase
assert_equal("test", tag)

assert_equal("http://localhost:3939/protected", record["url"])
puts "tag=#{tag}, time=#{time}, record=#{record}, time_class=#{time.class}"
assert(time.is_a?(Fluent::EventTime))

assert_equal(401, record["status"])
Expand Down
Loading