Skip to content
This repository was archived by the owner on Oct 15, 2025. It is now read-only.

Commit 23be161

Browse files
author
Juuso Mäyränen
committed
Use thread pools
1 parent 8152028 commit 23be161

File tree

2 files changed

+162
-56
lines changed

2 files changed

+162
-56
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 83 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
require "logstash/inputs/base"
44
require "logstash/inputs/threadable"
55
require 'redis'
6-
require 'concurrent/atomics'
6+
require 'concurrent'
7+
require 'concurrent/executors'
78

89
# This input will read events from a Redis instance; it supports both Redis channels and lists.
910
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
@@ -19,7 +20,6 @@
1920
#
2021
module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
2122
BATCH_EMPTY_SLEEP = 0.25
22-
MAX_ITEMS_PER_WORKER = 1000
2323

2424
config_name "redis"
2525

@@ -51,17 +51,29 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
5151
config :key, :validate => :string, :required => true
5252

5353
# Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
54-
# key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
55-
# If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
56-
config :data_type, :validate => [ "list", "list_pattern", "channel", "pattern_channel" ], :required => true
54+
# key. If `data_type` is `pattern_list`, then we will spawn a number of worker
55+
# threads that will LPOP from keys matching that pattern. If `data_type` is
56+
# `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`,
57+
# then we will PSUBSCRIBE to the key.
58+
config :data_type, :validate => [ "list", "pattern_list", "channel", "pattern_channel" ], :required => true
5759

5860
# The number of events to return from Redis using EVAL.
5961
config :batch_count, :validate => :number, :default => 125
6062

6163
# Redefined Redis commands to be passed to the Redis client.
6264
config :command_map, :validate => :hash, :default => {}
6365

64-
config :worker_thread_count, :validate => :number, :default => 20
66+
# Maximum number of worker threads to spawn when using `data_type` `pattern_list`.
67+
config :pattern_list_threads, :validate => :number, :default => 20
68+
69+
# Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`.
70+
# After the list is empty or this number of items have been processed, the thread will exit and a
71+
# new one will be started if there are non-empty lists matching the pattern without a consumer.
72+
config :pattern_list_max_items, :validate => :number, :default => 1000
73+
74+
# Time to sleep in main loop after checking if more threads can/need to be spawned.
75+
# Applies to `data_type` is `pattern_list`
76+
config :pattern_list_threadpool_sleep, :validate => :number, :default => 0.2
6577

6678
public
6779
# public API
@@ -81,6 +93,15 @@ def new_redis_instance
8193
@redis_builder.call
8294
end
8395

96+
def init_threadpool
97+
@threadpool ||= Concurrent::ThreadPoolExecutor.new(
98+
min_threads: @pattern_list_threads,
99+
max_threads: @pattern_list_threads,
100+
max_queue: 2 * @pattern_list_threads
101+
)
102+
@current_workers ||= Concurrent::Set.new
103+
end
104+
84105
def register
85106
@redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"
86107

@@ -90,7 +111,7 @@ def register
90111
if @data_type == 'list' || @data_type == 'dummy'
91112
@run_method = method(:list_runner)
92113
@stop_method = method(:list_stop)
93-
elsif @data_type == 'list_pattern'
114+
elsif @data_type == 'pattern_list'
94115
@run_method = method(:list_pattern_runner)
95116
@stop_method = method(:list_pattern_stop)
96117
elsif @data_type == 'channel'
@@ -200,13 +221,18 @@ def queue_event(msg, output_queue, channel=nil)
200221
end
201222

202223
# private
203-
def list_stop
224+
def reset_redis
204225
return if @redis.nil? || !@redis.connected?
205226

206227
@redis.quit rescue nil
207228
@redis = nil
208229
end
209230

231+
# private
232+
def list_stop
233+
reset_redis
234+
end
235+
210236
# private
211237
def list_runner(output_queue)
212238
while !stop?
@@ -224,84 +250,86 @@ def list_runner(output_queue)
224250
end
225251
end
226252

253+
#private
254+
def reset_threadpool
255+
return if @threadpool.nil?
256+
@threadpool.shutdown
257+
@threadpool.wait_for_termination
258+
@threadpool = nil
259+
end
260+
227261
# private
228262
def list_pattern_stop
229-
return if @redis.nil? || !@redis.connected?
263+
reset_redis
264+
reset_threadpool
265+
end
230266

231-
@redis.quit rescue nil
232-
@redis = nil
233-
@worker_threads.each do |key, thread|
234-
if thread.alive?
235-
thread.join(5)
236-
@logger.warn("Thread #{key} joined")
267+
# private
268+
def worker_consume(output_queue, key)
269+
@logger.debug("Launched worker for #{key}")
270+
redis = new_redis_instance
271+
begin
272+
(0...@pattern_list_max_items).each do
273+
if stop?
274+
@logger.debug("Breaking from thread #{key} as it was requested to stop")
275+
break
276+
end
277+
value = redis.lpop(key)
278+
break if value.nil?
279+
queue_event(value, output_queue)
237280
end
281+
ensure
282+
redis.quit rescue nil
238283
end
284+
@logger.debug("Exit worker for #{key}")
285+
end
286+
287+
# private
288+
def threadpool_capacity?
289+
@threadpool.remaining_capacity > 0
239290
end
240291

292+
# private
241293
def launch_worker(output_queue, key)
242-
Thread.new do
243-
redis = new_redis_instance
294+
@current_workers.add(key)
295+
@threadpool.post do
244296
begin
245-
(0..MAX_ITEMS_PER_WORKER).each do
246-
if stop?
247-
@logger.warn("Breaking from thread #{key} as it was requested to stop")
248-
break
249-
end
250-
value = redis.lpop(key)
251-
break if value.nil?
252-
queue_event(value, output_queue)
253-
@items_processed.increment
254-
end
297+
worker_consume(output_queue, key)
255298
ensure
256-
redis.quit rescue nil
299+
@current_workers.delete(key)
257300
end
258301
end
259302
end
260303

261-
def clear_finished_workers
262-
finished_threads = []
263-
@worker_threads.each do |key, thread|
264-
next if thread.alive?
265-
finished_threads << key
266-
end
267-
finished_threads.each { |key| @worker_threads.delete(key) }
268-
end
269-
304+
# private
270305
def ensure_workers(output_queue)
271-
free_slots = @worker_thread_count - @worker_threads.length
272-
return if free_slots == 0
273-
keys = @redis.keys(@key)
274-
keys.shuffle
275-
keys.each do |key|
276-
next if @worker_threads.has_key?(key)
277-
@worker_threads[key] = launch_worker(output_queue, key)
278-
free_slots -= 1
279-
break if free_slots == 0
306+
return unless threadpool_capacity?
307+
@redis.keys(@key).shuffle.each do |key|
308+
next if @current_workers.include?(key)
309+
launch_worker(output_queue, key)
310+
break unless threadpool_capacity?
280311
end
281312
end
282313

283-
def init_list_pattern_runner(output_queue)
284-
@worker_threads ||= {}
314+
# private
315+
def init_list_pattern_runner
316+
init_threadpool
285317
@redis ||= connect
286-
@items_processed ||= Concurrent::AtomicFixnum.new
287-
clear_finished_workers
288-
ensure_workers(output_queue)
289318
end
290319

291320
# private
292321
def list_pattern_runner(output_queue)
293-
items = 0
294322
while !stop?
295323
begin
296-
init_list_pattern_runner(output_queue)
297-
sleep(0.1)
324+
init_list_pattern_runner if @redis.nil?
325+
ensure_workers(output_queue)
326+
sleep(@pattern_list_threadpool_sleep)
298327
rescue ::Redis::BaseError => e
299328
@logger.warn("Redis connection problem", :exception => e)
300329
@redis = nil
301330
sleep 1
302331
end
303332
end
304-
@logger.warn("Total items processed: #{@items_processed}")
305333
end
306334

307335
def list_batch_listener(redis, output_queue)

spec/inputs/redis_spec.rb

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,26 @@ def process(conf, event_count)
6363
populate(key, event_count)
6464
process(conf, event_count)
6565
end
66+
67+
it "should read events from a list pattern" do
68+
key_base = SecureRandom.hex
69+
conf = <<-CONFIG
70+
input {
71+
redis {
72+
type => "blah"
73+
key => "#{key}.*"
74+
data_type => "list_pattern"
75+
}
76+
}
77+
CONFIG
78+
total_event_count = 0
79+
(0..10).each do |idx|
80+
event_count = 100 + rand(50)
81+
total_event_count += event_count
82+
populate("#{key_base}.#{idx}", event_count)
83+
end
84+
process(conf, total_event_count)
85+
end
6686
end
6787

6888
# unit tests ---------------------
@@ -264,6 +284,64 @@ def process(conf, event_count)
264284
end
265285
end
266286

287+
context 'runtime for pattern_list data_type' do
288+
let(:data_type) { 'pattern_list' }
289+
let(:key) { 'foo.*' }
290+
before do
291+
subject.register
292+
subject.init_threadpool
293+
end
294+
295+
context 'close when redis is unset' do
296+
let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] }
297+
298+
it 'does not attempt to quit' do
299+
allow(redis).to receive(:nil?).and_return(true)
300+
quit_calls.each do |call|
301+
expect(redis).not_to receive(call)
302+
end
303+
expect {subject.do_stop}.not_to raise_error
304+
end
305+
end
306+
307+
it 'calling the run method, adds events to the queue' do
308+
expect(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
309+
expect(redis).to receive(:lpop).at_least(:once).and_return('l1')
310+
311+
allow(redis).to receive(:connected?).and_return(connected.last)
312+
allow(redis).to receive(:quit)
313+
314+
tt = Thread.new do
315+
end_by = Time.now + 3
316+
while accumulator.size < 1 and Time.now <= end_by
317+
sleep 0.1
318+
end
319+
subject.do_stop
320+
end
321+
322+
subject.run(accumulator)
323+
324+
tt.join
325+
326+
expect(accumulator.size).to be > 0
327+
end
328+
329+
it 'multiple close calls, calls to redis once' do
330+
subject.use_redis(redis)
331+
allow(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
332+
allow(redis).to receive(:lpop).and_return('l1')
333+
expect(redis).to receive(:connected?).and_return(connected.last)
334+
quit_calls.each do |call|
335+
expect(redis).to receive(call).at_most(:once)
336+
end
337+
338+
subject.do_stop
339+
connected.push(false) #can't use let block here so push to array
340+
expect {subject.do_stop}.not_to raise_error
341+
subject.do_stop
342+
end
343+
end
344+
267345
context 'for the subscribe data_types' do
268346
def run_it_thread(inst)
269347
Thread.new(inst) do |subj|
@@ -396,7 +474,7 @@ def close_thread(inst, rt)
396474

397475
describe LogStash::Inputs::Redis do
398476
context "when using data type" do
399-
["list", "channel", "pattern_channel"].each do |data_type|
477+
["list", "channel", "pattern_channel", "pattern_list"].each do |data_type|
400478
context data_type do
401479
it_behaves_like "an interruptible input plugin" do
402480
let(:config) { {'key' => 'foo', 'data_type' => data_type } }

0 commit comments

Comments
 (0)