Skip to content
This repository was archived by the owner on Oct 15, 2025. It is now read-only.

Commit 8152028

Browse files
author
Juuso Mäyränen
committed
Support list patterns
1 parent 73f0577 commit 8152028

File tree

1 file changed

+88
-1
lines changed

1 file changed

+88
-1
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require "logstash/inputs/base"
44
require "logstash/inputs/threadable"
55
require 'redis'
6+
require 'concurrent/atomics'
67

78
# This input will read events from a Redis instance; it supports both Redis channels and lists.
89
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
@@ -18,6 +19,7 @@
1819
#
1920
module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
2021
BATCH_EMPTY_SLEEP = 0.25
22+
MAX_ITEMS_PER_WORKER = 1000
2123

2224
config_name "redis"
2325

@@ -51,14 +53,16 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
5153
# Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
5254
# key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
5355
# If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
54-
config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true
56+
config :data_type, :validate => [ "list", "list_pattern", "channel", "pattern_channel" ], :required => true
5557

5658
# The number of events to return from Redis using EVAL.
5759
config :batch_count, :validate => :number, :default => 125
5860

5961
# Redefined Redis commands to be passed to the Redis client.
6062
config :command_map, :validate => :hash, :default => {}
6163

64+
config :worker_thread_count, :validate => :number, :default => 20
65+
6266
public
6367
# public API
6468
# use to store a proc that can provide a Redis instance or mock
@@ -86,6 +90,9 @@ def register
8690
if @data_type == 'list' || @data_type == 'dummy'
8791
@run_method = method(:list_runner)
8892
@stop_method = method(:list_stop)
93+
elsif @data_type == 'list_pattern'
94+
@run_method = method(:list_pattern_runner)
95+
@stop_method = method(:list_pattern_stop)
8996
elsif @data_type == 'channel'
9097
@run_method = method(:channel_runner)
9198
@stop_method = method(:subscribe_stop)
@@ -217,6 +224,86 @@ def list_runner(output_queue)
217224
end
218225
end
219226

227+
# private
228+
def list_pattern_stop
229+
return if @redis.nil? || !@redis.connected?
230+
231+
@redis.quit rescue nil
232+
@redis = nil
233+
@worker_threads.each do |key, thread|
234+
if thread.alive?
235+
thread.join(5)
236+
@logger.warn("Thread #{key} joined")
237+
end
238+
end
239+
end
240+
241+
def launch_worker(output_queue, key)
242+
Thread.new do
243+
redis = new_redis_instance
244+
begin
245+
(0..MAX_ITEMS_PER_WORKER).each do
246+
if stop?
247+
@logger.warn("Breaking from thread #{key} as it was requested to stop")
248+
break
249+
end
250+
value = redis.lpop(key)
251+
break if value.nil?
252+
queue_event(value, output_queue)
253+
@items_processed.increment
254+
end
255+
ensure
256+
redis.quit rescue nil
257+
end
258+
end
259+
end
260+
261+
def clear_finished_workers
262+
finished_threads = []
263+
@worker_threads.each do |key, thread|
264+
next if thread.alive?
265+
finished_threads << key
266+
end
267+
finished_threads.each { |key| @worker_threads.delete(key) }
268+
end
269+
270+
def ensure_workers(output_queue)
271+
free_slots = @worker_thread_count - @worker_threads.length
272+
return if free_slots == 0
273+
keys = @redis.keys(@key)
274+
keys.shuffle
275+
keys.each do |key|
276+
next if @worker_threads.has_key?(key)
277+
@worker_threads[key] = launch_worker(output_queue, key)
278+
free_slots -= 1
279+
break if free_slots == 0
280+
end
281+
end
282+
283+
def init_list_pattern_runner(output_queue)
284+
@worker_threads ||= {}
285+
@redis ||= connect
286+
@items_processed ||= Concurrent::AtomicFixnum.new
287+
clear_finished_workers
288+
ensure_workers(output_queue)
289+
end
290+
291+
# private
292+
def list_pattern_runner(output_queue)
293+
items = 0
294+
while !stop?
295+
begin
296+
init_list_pattern_runner(output_queue)
297+
sleep(0.1)
298+
rescue ::Redis::BaseError => e
299+
@logger.warn("Redis connection problem", :exception => e)
300+
@redis = nil
301+
sleep 1
302+
end
303+
end
304+
@logger.warn("Total items processed: #{@items_processed}")
305+
end
306+
220307
def list_batch_listener(redis, output_queue)
221308
begin
222309
results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])

0 commit comments

Comments
 (0)