Skip to content
This repository was archived by the owner on Oct 15, 2025. It is now read-only.

Commit 7256fd6

Browse files
author
Juuso Mäyränen
committed
Refactor and add some basic tests
1 parent b4ed7d6 commit 7256fd6

File tree

2 files changed

+95
-13
lines changed

2 files changed

+95
-13
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
5555
# Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
5656
# key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
5757
# If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
58-
config :data_type, :validate => [ "list", "list_pattern", "channel", "pattern_channel" ], :required => true
58+
config :data_type, :validate => [ "list", "pattern_list", "channel", "pattern_channel" ], :required => true
5959

6060
# The number of events to return from Redis using EVAL.
6161
config :batch_count, :validate => :number, :default => 125
@@ -83,6 +83,17 @@ def new_redis_instance
8383
@redis_builder.call
8484
end
8585

86+
def init_threadpool
87+
@threadpool_queue_max ||= 2 * @worker_thread_count
88+
@threadpool ||= Concurrent::ThreadPoolExecutor.new(
89+
min_threads: @worker_thread_count,
90+
max_threads: @worker_thread_count,
91+
max_queue: @threadpool_queue_max
92+
)
93+
@items_processed ||= Concurrent::AtomicFixnum.new
94+
@current_workers ||= Concurrent::Set.new
95+
end
96+
8697
def register
8798
@redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"
8899

@@ -92,7 +103,7 @@ def register
92103
if @data_type == 'list' || @data_type == 'dummy'
93104
@run_method = method(:list_runner)
94105
@stop_method = method(:list_stop)
95-
elsif @data_type == 'list_pattern'
106+
elsif @data_type == 'pattern_list'
96107
@run_method = method(:list_pattern_runner)
97108
@stop_method = method(:list_pattern_stop)
98109
elsif @data_type == 'channel'
@@ -282,23 +293,16 @@ def ensure_workers(output_queue)
282293
end
283294

284295
# private
285-
def init_list_pattern_runner(output_queue)
286-
@threadpool_queue_max ||= 2 * @worker_thread_count
287-
@threadpool ||= Concurrent::ThreadPoolExecutor.new(
288-
min_threads: @worker_thread_count,
289-
max_threads: @worker_thread_count,
290-
max_queue: @threadpool_queue_max
291-
)
292-
@items_processed ||= Concurrent::AtomicFixnum.new
293-
@current_workers ||= Concurrent::Set.new
296+
def init_list_pattern_runner
297+
init_threadpool
294298
@redis ||= connect
295299
end
296300

297301
# private
298302
def list_pattern_runner(output_queue)
299303
while !stop?
300304
begin
301-
init_list_pattern_runner(output_queue) if @redis.nil?
305+
init_list_pattern_runner if @redis.nil?
302306
ensure_workers(output_queue)
303307
sleep(0.1)
304308
rescue ::Redis::BaseError => e

spec/inputs/redis_spec.rb

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,26 @@ def process(conf, event_count)
6363
populate(key, event_count)
6464
process(conf, event_count)
6565
end
66+
67+
it "should read events from a list pattern" do
68+
key_base = SecureRandom.hex
69+
conf = <<-CONFIG
70+
input {
71+
redis {
72+
type => "blah"
73+
key => "#{key}.*"
74+
data_type => "list_pattern"
75+
}
76+
}
77+
CONFIG
78+
total_event_count = 0
79+
(0..10).each do |idx|
80+
event_count = 100 + rand(50)
81+
total_event_count += event_count
82+
populate("#{key_base}.#{idx}", event_count)
83+
end
84+
process(conf, total_event_count)
85+
end
6686
end
6787

6888
# unit tests ---------------------
@@ -264,6 +284,64 @@ def process(conf, event_count)
264284
end
265285
end
266286

287+
context 'runtime for pattern_list data_type' do
288+
let(:data_type) { 'pattern_list' }
289+
let(:key) { 'foo.*' }
290+
before do
291+
subject.register
292+
subject.init_threadpool
293+
end
294+
295+
context 'close when redis is unset' do
296+
let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] }
297+
298+
it 'does not attempt to quit' do
299+
allow(redis).to receive(:nil?).and_return(true)
300+
quit_calls.each do |call|
301+
expect(redis).not_to receive(call)
302+
end
303+
expect {subject.do_stop}.not_to raise_error
304+
end
305+
end
306+
307+
it 'calling the run method, adds events to the queue' do
308+
expect(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
309+
expect(redis).to receive(:lpop).at_least(:once).and_return('l1')
310+
311+
allow(redis).to receive(:connected?).and_return(connected.last)
312+
allow(redis).to receive(:quit)
313+
314+
tt = Thread.new do
315+
end_by = Time.now + 3
316+
while accumulator.size < 1 and Time.now <= end_by
317+
sleep 0.1
318+
end
319+
subject.do_stop
320+
end
321+
322+
subject.run(accumulator)
323+
324+
tt.join
325+
326+
expect(accumulator.size).to be > 0
327+
end
328+
329+
it 'multiple close calls, calls to redis once' do
330+
subject.use_redis(redis)
331+
allow(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
332+
allow(redis).to receive(:lpop).and_return('l1')
333+
expect(redis).to receive(:connected?).and_return(connected.last)
334+
quit_calls.each do |call|
335+
expect(redis).to receive(call).at_most(:once)
336+
end
337+
338+
subject.do_stop
339+
connected.push(false) #can't use let block here so push to array
340+
expect {subject.do_stop}.not_to raise_error
341+
subject.do_stop
342+
end
343+
end
344+
267345
context 'for the subscribe data_types' do
268346
def run_it_thread(inst)
269347
Thread.new(inst) do |subj|
@@ -396,7 +474,7 @@ def close_thread(inst, rt)
396474

397475
describe LogStash::Inputs::Redis do
398476
context "when using data type" do
399-
["list", "channel", "pattern_channel"].each do |data_type|
477+
["list", "channel", "pattern_channel", "pattern_list"].each do |data_type|
400478
context data_type do
401479
it_behaves_like "an interruptible input plugin" do
402480
let(:config) { {'key' => 'foo', 'data_type' => data_type } }

0 commit comments

Comments
 (0)