33require "logstash/inputs/base"
44require "logstash/inputs/threadable"
55require 'redis'
6+ require 'concurrent'
7+ require 'concurrent/executors'
68
79# This input will read events from a Redis instance; it supports both Redis channels and lists.
810# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
@@ -49,16 +51,30 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
4951 config :key , :validate => :string , :required => true
5052
5153 # Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
52- # key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
53- # If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
54- config :data_type , :validate => [ "list" , "channel" , "pattern_channel" ] , :required => true
54+ # key. If `data_type` is `pattern_list`, then we will spawn a number of worker
55+ # threads that will LPOP from keys matching that pattern. If `data_type` is
56+ # `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`,
57+ # then we will PSUBSCRIBE to the key.
58+ config :data_type , :validate => [ "list" , "pattern_list" , "channel" , "pattern_channel" ] , :required => true
5559
5660 # The number of events to return from Redis using EVAL.
5761 config :batch_count , :validate => :number , :default => 125
5862
5963 # Redefined Redis commands to be passed to the Redis client.
6064 config :command_map , :validate => :hash , :default => { }
6165
66+ # Maximum number of worker threads to spawn when using `data_type` `pattern_list`.
67+ config :pattern_list_threads , :validate => :number , :default => 20
68+
69+ # Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`.
70+ # After the list is empty or this number of items have been processed, the thread will exit and a
71+ # new one will be started if there are non-empty lists matching the pattern without a consumer.
72+ config :pattern_list_max_items , :validate => :number , :default => 1000
73+
74+ # Time to sleep in main loop after checking if more threads can/need to be spawned.
75+ # Applies to `data_type` is `pattern_list`
76+ config :pattern_list_threadpool_sleep , :validate => :number , :default => 0.2
77+
6278 public
6379 # public API
6480 # use to store a proc that can provide a Redis instance or mock
@@ -77,6 +93,15 @@ def new_redis_instance
7793 @redis_builder . call
7894 end
7995
96+ def init_threadpool
97+ @threadpool ||= Concurrent ::ThreadPoolExecutor . new (
98+ min_threads : @pattern_list_threads ,
99+ max_threads : @pattern_list_threads ,
100+ max_queue : 2 * @pattern_list_threads
101+ )
102+ @current_workers ||= Concurrent ::Set . new
103+ end
104+
80105 def register
81106 @redis_url = @path . nil? ? "redis://#{ @password } @#{ @host } :#{ @port } /#{ @db } " : "#{ @password } @#{ @path } /#{ @db } "
82107
@@ -86,6 +111,9 @@ def register
86111 if @data_type == 'list' || @data_type == 'dummy'
87112 @run_method = method ( :list_runner )
88113 @stop_method = method ( :list_stop )
114+ elsif @data_type == 'pattern_list'
115+ @run_method = method ( :list_pattern_runner )
116+ @stop_method = method ( :list_pattern_stop )
89117 elsif @data_type == 'channel'
90118 @run_method = method ( :channel_runner )
91119 @stop_method = method ( :subscribe_stop )
@@ -193,13 +221,18 @@ def queue_event(msg, output_queue, channel=nil)
193221 end
194222
195223 # private
196- def list_stop
224+ def reset_redis
197225 return if @redis . nil? || !@redis . connected?
198226
199227 @redis . quit rescue nil
200228 @redis = nil
201229 end
202230
231+ # private
232+ def list_stop
233+ reset_redis
234+ end
235+
203236 # private
204237 def list_runner ( output_queue )
205238 while !stop?
@@ -217,6 +250,88 @@ def list_runner(output_queue)
217250 end
218251 end
219252
253+ #private
254+ def reset_threadpool
255+ return if @threadpool . nil?
256+ @threadpool . shutdown
257+ @threadpool . wait_for_termination
258+ @threadpool = nil
259+ end
260+
261+ # private
262+ def list_pattern_stop
263+ reset_redis
264+ reset_threadpool
265+ end
266+
267+ # private
268+ def worker_consume ( output_queue , key )
269+ @logger . debug ( "Launched worker for #{ key } " )
270+ redis = new_redis_instance
271+ begin
272+ ( 0 ...@pattern_list_max_items ) . each do
273+ if stop?
274+ @logger . debug ( "Breaking from thread #{ key } as it was requested to stop" )
275+ break
276+ end
277+ value = redis . lpop ( key )
278+ break if value . nil?
279+ queue_event ( value , output_queue )
280+ end
281+ ensure
282+ redis . quit rescue nil
283+ end
284+ @logger . debug ( "Exit worker for #{ key } " )
285+ end
286+
287+ # private
288+ def threadpool_capacity?
289+ @threadpool . remaining_capacity > 0
290+ end
291+
292+ # private
293+ def launch_worker ( output_queue , key )
294+ @current_workers . add ( key )
295+ @threadpool . post do
296+ begin
297+ worker_consume ( output_queue , key )
298+ ensure
299+ @current_workers . delete ( key )
300+ end
301+ end
302+ end
303+
304+ # private
305+ def ensure_workers ( output_queue )
306+ return unless threadpool_capacity?
307+ @redis . keys ( @key ) . shuffle . each do |key |
308+ next if @current_workers . include? ( key )
309+ launch_worker ( output_queue , key )
310+ break unless threadpool_capacity?
311+ end
312+ end
313+
314+ # private
315+ def init_list_pattern_runner
316+ init_threadpool
317+ @redis ||= connect
318+ end
319+
320+ # private
321+ def list_pattern_runner ( output_queue )
322+ while !stop?
323+ begin
324+ init_list_pattern_runner if @redis . nil?
325+ ensure_workers ( output_queue )
326+ sleep ( @pattern_list_threadpool_sleep )
327+ rescue ::Redis ::BaseError => e
328+ @logger . warn ( "Redis connection problem" , :exception => e )
329+ @redis = nil
330+ sleep 1
331+ end
332+ end
333+ end
334+
220335 def list_batch_listener ( redis , output_queue )
221336 begin
222337 results = redis . evalsha ( @redis_script_sha , [ @key ] , [ @batch_count -1 ] )
0 commit comments