Skip to content
This repository was archived by the owner on Oct 15, 2025. It is now read-only.

Commit b4ed7d6

Browse files
author
Juuso Mäyränen
committed
Use thread pools
1 parent c437cc9 commit b4ed7d6

File tree

1 file changed

+46
-40
lines changed

1 file changed

+46
-40
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
require "logstash/inputs/base"
44
require "logstash/inputs/threadable"
55
require 'redis'
6+
require 'concurrent'
67
require 'concurrent/atomics'
8+
require 'concurrent/executors'
79

810
# This input will read events from a Redis instance; it supports both Redis channels and lists.
911
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
@@ -230,77 +232,81 @@ def list_pattern_stop
230232

231233
@redis.quit rescue nil
232234
@redis = nil
233-
@worker_threads.each do |key, thread|
234-
if thread.alive?
235-
thread.join(5)
236-
@logger.warn("Thread #{key} joined")
237-
end
238-
end
235+
@threadpool.shutdown
236+
@threadpool.wait_for_termination
239237
end
240238

239+
# private
241240
def launch_worker(output_queue, key)
242-
Thread.new do
243-
redis = new_redis_instance
244-
begin
245-
(0..MAX_ITEMS_PER_WORKER).each do
246-
if stop?
247-
@logger.warn("Breaking from thread #{key} as it was requested to stop")
248-
break
249-
end
250-
value = redis.lpop(key)
251-
break if value.nil?
252-
queue_event(value, output_queue)
253-
@items_processed.increment
241+
@logger.debug("Launched worker for #{key}")
242+
redis = new_redis_instance
243+
begin
244+
(0..MAX_ITEMS_PER_WORKER).each do
245+
if stop?
246+
@logger.debug("Breaking from thread #{key} as it was requested to stop")
247+
break
254248
end
255-
ensure
256-
redis.quit rescue nil
249+
value = redis.lpop(key)
250+
break if value.nil?
251+
queue_event(value, output_queue)
252+
@items_processed.increment
257253
end
254+
ensure
255+
redis.quit rescue nil
258256
end
257+
@logger.debug("Exit worker for #{key}")
259258
end
260259

261-
def clear_finished_workers
262-
finished_threads = []
263-
@worker_threads.each do |key, thread|
264-
next if thread.alive?
265-
finished_threads << key
266-
end
267-
finished_threads.each { |key| @worker_threads.delete(key) }
260+
# private
261+
def threadpool_capacity?
262+
@threadpool.remaining_capacity > 0
268263
end
269264

265+
# private
270266
def ensure_workers(output_queue)
271-
free_slots = @worker_thread_count - @worker_threads.length
272-
return if free_slots == 0
267+
return unless threadpool_capacity?
273268
keys = @redis.keys(@key)
274269
keys.shuffle
275270
keys.each do |key|
276-
next if @worker_threads.has_key?(key)
277-
@worker_threads[key] = launch_worker(output_queue, key)
278-
free_slots -= 1
279-
break if free_slots == 0
271+
break unless threadpool_capacity?
272+
next if @current_workers.include?(key)
273+
@current_workers.add(key)
274+
@threadpool.post do
275+
begin
276+
launch_worker(output_queue, key)
277+
ensure
278+
@current_workers.delete(key)
279+
end
280+
end
280281
end
281282
end
282283

284+
# private
283285
def init_list_pattern_runner(output_queue)
284-
@worker_threads ||= {}
285-
@redis ||= connect
286+
@threadpool_queue_max ||= 2 * @worker_thread_count
287+
@threadpool ||= Concurrent::ThreadPoolExecutor.new(
288+
min_threads: @worker_thread_count,
289+
max_threads: @worker_thread_count,
290+
max_queue: @threadpool_queue_max
291+
)
286292
@items_processed ||= Concurrent::AtomicFixnum.new
287-
clear_finished_workers
288-
ensure_workers(output_queue)
293+
@current_workers ||= Concurrent::Set.new
294+
@redis ||= connect
289295
end
290296

291297
# private
292298
def list_pattern_runner(output_queue)
293-
items = 0
294299
while !stop?
295300
begin
296-
init_list_pattern_runner(output_queue)
301+
init_list_pattern_runner(output_queue) if @redis.nil?
302+
ensure_workers(output_queue)
297303
sleep(0.1)
298304
rescue ::Redis::BaseError => e
299305
@logger.warn("Redis connection problem", :exception => e)
300306
@redis = nil
301307
end
302308
end
303-
@logger.warn("Total items processed: #{@items_processed}")
309+
@logger.debug("Total items processed: #{@items_processed.value}")
304310
end
305311

306312
def list_batch_listener(redis, output_queue)

0 commit comments

Comments
 (0)