@@ -53,8 +53,10 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
5353 config :key , :validate => :string , :required => true
5454
5555 # Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
56- # key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
57- # If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
56+ # key. If `data_type` is `pattern_list`, then we will spawn a number of worker
57+ # threads that will LPOP from keys matching that pattern. If `data_type` is
58+ # `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`,
59+ # then we will PSUBSCRIBE to the key.
5860 config :data_type , :validate => [ "list" , "pattern_list" , "channel" , "pattern_channel" ] , :required => true
5961
6062 # The number of events to return from Redis using EVAL.
@@ -63,8 +65,18 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
6365 # Redefined Redis commands to be passed to the Redis client.
6466 config :command_map , :validate => :hash , :default => { }
6567
68+ # Maximum number of worker threads to spawn when using `data_type` `pattern_list`.
6669 config :worker_thread_count , :validate => :number , :default => 20
6770
71+ # Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`.
72+ # After the queue is empty or this number of items have been processed, the thread will exit and a
73+ # new one will be started.
74+ config :max_items_per_worker , :validate => :number , :default => 1000
75+
76+ # Time to sleep in main loop after checking if more threads can/need to be spawned.
77+ # Applies to `data_type` is `pattern_list`
78+ config :threadpool_queue_sleep , :validate => :number , :default => 0.2
79+
6880 public
6981 # public API
7082 # use to store a proc that can provide a Redis instance or mock
@@ -252,7 +264,7 @@ def launch_worker(output_queue, key)
252264 @logger . debug ( "Launched worker for #{ key } " )
253265 redis = new_redis_instance
254266 begin
255- ( 0 ..MAX_ITEMS_PER_WORKER ) . each do
267+ ( 0 ..@max_items_per_worker ) . each do
256268 if stop?
257269 @logger . debug ( "Breaking from thread #{ key } as it was requested to stop" )
258270 break
@@ -304,7 +316,7 @@ def list_pattern_runner(output_queue)
304316 begin
305317 init_list_pattern_runner if @redis . nil?
306318 ensure_workers ( output_queue )
307- sleep ( 0.1 )
319+ sleep ( @threadpool_queue_sleep )
308320 rescue ::Redis ::BaseError => e
309321 @logger . warn ( "Redis connection problem" , :exception => e )
310322 @redis = nil
0 commit comments