From 4feab829a1852b014c567ee3d732cfe583274160 Mon Sep 17 00:00:00 2001 From: Ryan Li Date: Fri, 10 Jul 2020 18:27:55 -0700 Subject: [PATCH 1/4] support login session, query payload, and multi events --- Dockerfile | 20 +++ README.md | 118 ++++++++++++++++++ lib/fluent/plugin/in_http_pull.rb | 99 ++++++++++++--- test/helper/stub_server.rb | 22 ++++ test/plugin/test_in_http_pull.rb | 2 +- test/plugin/test_in_http_pull_auth.rb | 1 + .../plugin/test_in_http_pull_session_event.rb | 103 +++++++++++++++ 7 files changed, 347 insertions(+), 18 deletions(-) create mode 100644 Dockerfile create mode 100644 test/plugin/test_in_http_pull_session_event.rb diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5fbae12 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM quay.io/fluentd_elasticsearch/fluentd:v3.0.2 as builder + +RUN apt-get update && apt-get install -y git build-essential autoconf automake libtool libsnappy-dev + +RUN mkdir -p /opt/app/fluent-plugin-http-pull +WORKDIR /opt/app/fluent-plugin-http-pull +ADD . ./ + +RUN bundle config set without 'development' && bundler install && rake install + +RUN gem install fluent-plugin-kafka --no-document -v 0.13.0 && \ + gem install snappy --no-document -v 0.0.17 && \ + gem install extlz4 --no-document -v 0.3.1 + +WORKDIR / + +RUN gem cleanup && \ + rm -rf /opt/app/fluent-plugin-http-pull && \ + apt-get remove --autoremove -y build-essential git autoconf automake && \ + apt-get autoclean \ No newline at end of file diff --git a/README.md b/README.md index 9a3b6fc..6222979 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,10 @@ This plugin will help to gathering status log from these status api. * [Monitor Status Code](#monitoring-http-status-code-only) * [Override User Agent](#override-user-agent) * [HTTP Basic Auth](#http-basic-auth) + * [HTTP Login with Payload](#http-login-with-payload) + * [HTTP Pull with Payload](#http-pull-with-payload) + * [Multiple Events](#multiple-events) + * [Encapsulated items](#encapsulated-items) * [HTTP Proxy](#http-proxy) * [Logging Response Header](#logging-http-response-header) * [Custom Request Header](#custom-request-header) @@ -161,6 +165,95 @@ You can use `user`, `password` options to provide authentication information. # 2017-05-17 21:41:47.872951000 +0900 status: {"url":"http://yourinfrastructure/api/status.json","status":200,"message":{ ... }} ``` +### HTTP login with payload + +If your infrastructure use cookies to manage the login session, and the login path is different to query path, you can use `login_path`, `login_payload`, and `path` options to provide authentication information. + +For example, login url is https://localhost:8080/login, query url is https://localhost:8080/search + +``` + + @type http_pull + + tag status + url https://localhost:8080 + path search + interval 1s + + login_path lobin + login_payload {"username":"tester","password":"drowssaP"} + verify_ssl false + + format json + +``` + +### HTTP pull with payload + +You can send json format `payload` to togather with the query. + +``` + + @type http_pull + + tag status + url https://localhost:8080/search + payload {"max-results": 1500} + + interval 1s + +``` + +### Multiple events + +If the server returns multiple events per request, for example + +``` +[{"message": "message 1"}, + {"message": "message 2"}] +``` +This can be handled by specify option `multi_event true` + +``` + + @type http_pull + + tag status + url https://localhost:8080/search + multi_event true + + interval 1s + +``` + +### Encapsulated items + +If the expected items are encapsulated in json structure, for example, + +``` +{"meta": {"server": "localhost"}, + "iterms": [ + {"message": "message 1"}, + {"message": "message 2"} + ] +} +``` + +You can fetch the messages by setting up option `event_key`, + +``` + + @type http_pull + + tag status + url https://localhost:8080/search + multi_event true + event_key items + + interval 1s + +``` + ### HTTP proxy You can send your requests via proxy server. @@ -344,6 +437,31 @@ The user for basic auth The password for basic auth +### Authentication Session Configuration + +#### login_path (string) (optional, default: nil) + +The subpath of the login url. + +#### login_payload (hash) (optional, default: nil) + +The payload send for authentication. +Note: login_path and login_payload has to be both nil or both not-nil. + +### Request Configuration + +#### payload (hash) (optional, default: nil) + +The query payload sent to server. + +#### multi_event (bool) (optional, default: false) + +Whether the response contains multiple events. + +#### event_key (string) (optional, default: nil) + +The key of the expected items in a json format response. + ### Req/Resp Header Configuration #### response_header (section) (optional, default: nil) diff --git a/lib/fluent/plugin/in_http_pull.rb b/lib/fluent/plugin/in_http_pull.rb index 6568320..6389c33 100644 --- a/lib/fluent/plugin/in_http_pull.rb +++ b/lib/fluent/plugin/in_http_pull.rb @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +require "fluent/log" require "fluent/plugin/input" require "rest-client" @@ -33,6 +34,18 @@ def initialize desc 'The url of monitoring target' config_param :url, :string + desc 'The path of monitoring target' + config_param :path, :string, default: nil + + desc 'Payload to query target' + config_param :payload, :hash, default: nil + + desc 'Message key' + config_param :event_key, :string, default: nil + + desc 'Response contains multiple events' + config_param :multi_event, :bool, default: false + desc 'The interval time between periodic request' config_param :interval, :time @@ -59,6 +72,13 @@ def initialize desc 'password of basic auth' config_param :password, :string, default: nil, secret: true + # login session + desc 'login path' + config_param :login_path, :string, default: nil + + desc 'login payload' + config_param :login_payload, :hash, default: nil + # req/res header options config_section :response_header, param_name: :response_headers, multi: true do desc 'The name of header to cature from response' @@ -88,6 +108,9 @@ def configure(conf) compat_parameters_convert(conf, :parser) super + if (@login_path && !@login_payload ) || (!@login_path && @login_payload) + raise Fluent::ConfigError, "login_path and login_payload should be both set or unset" + end @parser = parser_create unless @status_only @_request_headers = { "Content-Type" => "application/x-www-form-urlencoded", @@ -111,23 +134,34 @@ def start def on_timer body = nil record = nil + record_time = Engine.now + emit_stream = Fluent::MultiEventStream.new + site = RestClient::Resource.new(@url, request_options) begin - res = RestClient::Request.execute request_options - record, body = get_record(res) + cookies = get_session_cookie(site) + + site = site[@path] if @path + if @payload + res = site.method(@http_method).call(@payload.to_json, :cookies=>cookies) + else + res = site.method(@http_method).call(:cookie=>cookies) + end + record, body = get_record(res, site.url) + process_events(record, body, emit_stream) rescue StandardError => err - record = { "url" => @url, "error" => err.message } + record = { "url" => site.url, "error" => err.message } if err.respond_to? :http_code record["status"] = err.http_code || 0 else record["status"] = 0 end + log.error(record) + emit_stream.add(record_time, record) end - record_time = Engine.now - record = parse(record, body) - router.emit(@tag, record_time, record) + router.emit_stream(@tag, emit_stream) end def shutdown @@ -136,7 +170,7 @@ def shutdown private def request_options - options = { method: @http_method, url: @url, timeout: @timeout, headers: @_request_headers } + options = { timeout: @timeout, headers: @_request_headers } options[:proxy] = @proxy if @proxy options[:user] = @user if @user @@ -151,9 +185,23 @@ def request_options return options end - def get_record(response) + def get_session_cookie(resource) + cookies = {} + return cookies unless @login_path and @login_payload + + login_response = resource[@login_path].post(@login_payload.to_json) + if login_response.code != 200 + raise RestClient::ExceptionWithResponse.new(nil, login_response.code) + else + cookies = login_response.cookie_jar + end + + return cookies + end + + def get_record(response, url) body = response.body - record = { "url" => @url, "status" => response.code } + record = { "url" => url, "status" => response.code } record["header"] = {} unless @response_headers.empty? @response_headers.each do |section| name = section["header"] @@ -161,20 +209,37 @@ def get_record(response) record["header"][name] = response.headers[symbolize_name] end - return record, body end - def parse(record, body) - if !@status_only && body != nil - @parser.parse(body) do |time, message| - record["message"] = message - record_time = time + def process_events(record, body, es) + + return es.add(Engine.now, record) if @status_only or body == nil + + # consume errors produced by parser by logging it + begin + @parser.parse(body) do |time, events| + + events = events[@event_key] if @multi_event and @event_key || [] + + # if @event_key not found, events will be converted to empty Array. + log.warning("event_key '#{@event_key}' not found") if events == [] + + # if each query result is a record, covert it to array. + events = [ events ] unless @multi_event + + events.each do |event| + item = record.dup + item['message'] = event + es.add(time, item) + end end + rescue StandardError => err + log.error("Failed to process result with error: #{err}") + log.debug("Failed to parse #{body}") end - - return record end + end end end diff --git a/test/helper/stub_server.rb b/test/helper/stub_server.rb index 24ab0d2..2527067 100644 --- a/test/helper/stub_server.rb +++ b/test/helper/stub_server.rb @@ -30,7 +30,12 @@ def initialize(port=3939, ssl_enable=false) @server.mount_proc '/custom_header', &method(:custom_header) @server.mount_proc '/method_post', &method(:method_post) + + @server.mount_proc '/login', &method(:login) + @server.mount_proc '/session_events', &method(:session_events) + @server.mount '/method_delete', DeleteService + end def start @@ -131,4 +136,21 @@ def method_post(req, res) res.body = '{ "status": "OK" }' end end + + def login(req, res) + if req.body and JSON.parse(req.body) == {"username"=>"admin", "password"=>"pwd"} + res.status = 200 + res['Content-Type'] = 'application/json' + res.cookies.push WEBrick::Cookie.new("session", "1") + else + res.status = 401 + end + end + + def session_events(req, res) + res.status = 200 + res['Content-Type'] = 'application/json' + res.body = '{"ListMeta":{},"items":[{"kind":"Event","meta":{"name":"1","uuid":"c51d9e82"}},{"kind":"Event","meta":{"name":"2","uuid":"b1b5686d"}}]}' + end + end diff --git a/test/plugin/test_in_http_pull.rb b/test/plugin/test_in_http_pull.rb index 8b84607..b8725be 100644 --- a/test/plugin/test_in_http_pull.rb +++ b/test/plugin/test_in_http_pull.rb @@ -1,5 +1,5 @@ require "helper" -require "fluent/plugin/in_http_pull.rb" +require "fluent/plugin/in_http_pull" require 'ostruct' diff --git a/test/plugin/test_in_http_pull_auth.rb b/test/plugin/test_in_http_pull_auth.rb index 876df9c..ec2c505 100644 --- a/test/plugin/test_in_http_pull_auth.rb +++ b/test/plugin/test_in_http_pull_auth.rb @@ -79,6 +79,7 @@ class HttpPullInputTestAuth < Test::Unit::TestCase assert_equal("test", tag) assert_equal("http://localhost:3939/protected", record["url"]) + puts "tag=#{tag}, time=#{time}, record=#{record}, time_class=#{time.class}" assert(time.is_a?(Fluent::EventTime)) assert_equal(401, record["status"]) diff --git a/test/plugin/test_in_http_pull_session_event.rb b/test/plugin/test_in_http_pull_session_event.rb new file mode 100644 index 0000000..961f5ba --- /dev/null +++ b/test/plugin/test_in_http_pull_session_event.rb @@ -0,0 +1,103 @@ +require "helper" +require "fluent/plugin/in_http_pull.rb" + +require 'ostruct' + +class HttpPullInputTestMultiEvent < Test::Unit::TestCase + @stub_server = nil + + setup do + @stub_server = StubServer.new + @stub_server.start + end + + teardown do + @stub_server.shutdown + end + + sub_test_case "multi event" do + TEST_NO_LOGIN_PAYLOAD = %[ + tag test + url http://localhost:3939 + path session_events + event_key items + multi_event true + + login_path login + + interval 3s + format json + http_method post + ] + + TEST_LOGIN_FAILURE = %[ + tag test + url http://localhost:3939 + path session_events + event_key items + multi_event true + + login_path login + login_payload {"username": "admin","password": "wrong"} + + interval 3s + format json + http_method post + ] + TEST_LOGIN_MULTI_EVENT = %[ + tag test + url http://localhost:3939 + path session_events + event_key items + multi_event true + + login_path login + login_payload {"username": "admin","password": "pwd"} + + interval 3s + format json + http_method post + ] + + test 'login failed no login payload' do + + assert_raise do + create_driver TEST_NO_LOGIN_PAYLOAD + end + end + + test 'login failed wrong credential' do + + d = create_driver TEST_LOGIN_FAILURE + d.run(timeout: 5) do + sleep 4 + end + assert_equal(1, d.events.length) + d.events.each do |tag, time, record| + assert_equal(401, record['status']) + end + end + + test 'session multi events' do + d = create_driver TEST_LOGIN_MULTI_EVENT + d.run(timeout: 5) do + sleep 4 + end + + assert_equal(2, d.events.length) + + uuids = [] + d.events.each do |tag, time, record| + uuids.push(record['message']['meta']['uuid']) + end + assert_equal(["c51d9e82","b1b5686d"], uuids) + end + + end + + private + + def create_driver(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::HttpPullInput).configure(conf) + end +end From e532836f7936f91692e4344cba6c5c35db3bbfc0 Mon Sep 17 00:00:00 2001 From: Ryan Li Date: Wed, 15 Jul 2020 13:29:07 -0700 Subject: [PATCH 2/4] address reviews comments --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6222979..983a9d6 100644 --- a/README.md +++ b/README.md @@ -180,7 +180,7 @@ For example, login url is https://localhost:8080/login, query url is https://loc path search interval 1s - login_path lobin + login_path login login_payload {"username":"tester","password":"drowssaP"} verify_ssl false @@ -232,7 +232,7 @@ If the expected items are encapsulated in json structure, for example, ``` {"meta": {"server": "localhost"}, - "iterms": [ + "items": [ {"message": "message 1"}, {"message": "message 2"} ] From 95624fea2d29acaa044010f1c28f154ad679e49e Mon Sep 17 00:00:00 2001 From: Ryan Li Date: Fri, 4 Sep 2020 09:05:20 -0700 Subject: [PATCH 3/4] remove Dockerfile --- Dockerfile | 20 -------------------- 1 file changed, 20 deletions(-) delete mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 5fbae12..0000000 --- a/Dockerfile +++ /dev/null @@ -1,20 +0,0 @@ -FROM quay.io/fluentd_elasticsearch/fluentd:v3.0.2 as builder - -RUN apt-get update && apt-get install -y git build-essential autoconf automake libtool libsnappy-dev - -RUN mkdir -p /opt/app/fluent-plugin-http-pull -WORKDIR /opt/app/fluent-plugin-http-pull -ADD . ./ - -RUN bundle config set without 'development' && bundler install && rake install - -RUN gem install fluent-plugin-kafka --no-document -v 0.13.0 && \ - gem install snappy --no-document -v 0.0.17 && \ - gem install extlz4 --no-document -v 0.3.1 - -WORKDIR / - -RUN gem cleanup && \ - rm -rf /opt/app/fluent-plugin-http-pull && \ - apt-get remove --autoremove -y build-essential git autoconf automake && \ - apt-get autoclean \ No newline at end of file From 1673ab2eb3f731d9c82609fa077cbc814428574e Mon Sep 17 00:00:00 2001 From: Ryan Li Date: Wed, 23 Sep 2020 14:20:33 -0700 Subject: [PATCH 4/4] reduce session login attempt --- lib/fluent/plugin/in_http_pull.rb | 54 +++++++++++++++++++------------ 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/lib/fluent/plugin/in_http_pull.rb b/lib/fluent/plugin/in_http_pull.rb index 6389c33..9440ef4 100644 --- a/lib/fluent/plugin/in_http_pull.rb +++ b/lib/fluent/plugin/in_http_pull.rb @@ -123,6 +123,7 @@ def configure(conf) end.to_h) @http_method = :head if @status_only + @cookies = {} end def start @@ -136,28 +137,30 @@ def on_timer record = nil record_time = Engine.now emit_stream = Fluent::MultiEventStream.new - site = RestClient::Resource.new(@url, request_options) - + base_site = RestClient::Resource.new(@url, request_options) + login_site = @login_path ? base_site[@login_path] : base_site + query_site = @path ? base_site[@path] : base_site + begin + get_session_cookie(login_site) + rescue StandardError => err + record = log_error(login_site.url, err) + emit_stream.add(record_time, record) + router.emit_stream(@tag, emit_stream) + return + end begin - cookies = get_session_cookie(site) - - site = site[@path] if @path if @payload - res = site.method(@http_method).call(@payload.to_json, :cookies=>cookies) + res = query_site.method(@http_method).call(@payload.to_json, :cookies=>@cookies) else - res = site.method(@http_method).call(:cookie=>cookies) + res = query_site.method(@http_method).call(:cookie=>@cookies) end - record, body = get_record(res, site.url) + record, body = get_record(res, query_site.url) process_events(record, body, emit_stream) rescue StandardError => err - record = { "url" => site.url, "error" => err.message } - if err.respond_to? :http_code - record["status"] = err.http_code || 0 - else - record["status"] = 0 - end - log.error(record) + record = log_error(query_site.url, err) + # reset session cookie if it is expired + @cookies = {} if not @cookies.empty? and record["status"] == 401 emit_stream.add(record_time, record) end @@ -169,6 +172,17 @@ def shutdown end private + def log_error(url, err) + record = { "url" => url, "error" => err.message } + if err.respond_to? :http_code + record["status"] = err.http_code || 0 + else + record["status"] = 0 + end + log.error(record) + return record + end + def request_options options = { timeout: @timeout, headers: @_request_headers } @@ -186,17 +200,15 @@ def request_options end def get_session_cookie(resource) - cookies = {} - return cookies unless @login_path and @login_payload - - login_response = resource[@login_path].post(@login_payload.to_json) + @cookies = {} if @cookies.nil? + return unless @login_path and @login_payload and @cookies.empty? + login_response = resource.post(@login_payload.to_json) if login_response.code != 200 raise RestClient::ExceptionWithResponse.new(nil, login_response.code) else - cookies = login_response.cookie_jar + @cookies = login_response.cookie_jar end - return cookies end def get_record(response, url)