@@ -122,8 +122,6 @@ def register
122122 @stop_method = method ( :subscribe_stop )
123123 end
124124
125- @list_method = batched? ? method ( :list_batch_listener ) : method ( :list_single_listener )
126-
127125 @identity = "#{ @redis_url } #{ @data_type } :#{ @key } "
128126 @logger . info ( "Registering Redis" , :identity => @identity )
129127 end # def register
@@ -147,7 +145,7 @@ def batched?
147145
148146 # private
149147 def is_list_type?
150- @data_type == 'list'
148+ @data_type == 'list' || @data_type == 'pattern_list'
151149 end
152150
153151 # private
@@ -235,6 +233,7 @@ def list_stop
235233
236234 # private
237235 def list_runner ( output_queue )
236+ @list_method = batched? ? method ( :list_batch_listener ) : method ( :list_single_listener )
238237 while !stop?
239238 begin
240239 @redis ||= connect
@@ -264,28 +263,53 @@ def list_pattern_stop
264263 reset_threadpool
265264 end
266265
266+ # private
267+ def list_pattern_process_item ( redis , output_queue , key )
268+ if stop?
269+ @logger . debug ( "Breaking from thread #{ key } as it was requested to stop" )
270+ return false
271+ end
272+ value = redis . lpop ( key )
273+ return false if value . nil?
274+ queue_event ( value , output_queue )
275+ true
276+ end
277+
278+ # private
279+ def list_pattern_single_processor ( redis , output_queue , key )
280+ ( 0 ...@pattern_list_max_items ) . each do
281+ break unless list_pattern_process_item ( redis , output_queue , key )
282+ end
283+ end
284+
285+ # private
286+ def list_pattern_batch_processor ( redis , output_queue , key )
287+ items_left = @pattern_list_max_items
288+ while items_left > 0
289+ limit = [ items_left , @batch_count ] . min
290+ processed = process_batch ( redis , output_queue , key , limit , 0 )
291+ @logger . warn ( "Got batch size #{ processed } for #{ key } " )
292+ if processed . zero? || processed < limit
293+ return
294+ end
295+ items_left -= processed
296+ end
297+ end
298+
267299 # private
268300 def worker_consume ( output_queue , key )
269301 @logger . debug ( "Launched worker for #{ key } " )
270302 begin
271303 redis ||= connect
272- ( 0 ...@pattern_list_max_items ) . each do
273- if stop?
274- @logger . debug ( "Breaking from thread #{ key } as it was requested to stop" )
275- break
276- end
277- value = redis . lpop ( key )
278- break if value . nil?
279- queue_event ( value , output_queue )
280- end
304+ @list_pattern_processor . call ( redis , output_queue , key )
281305 rescue ::Redis ::BaseError => e
282306 @logger . warn ( "Redis connection problem in thread for key #{ key } . Sleeping a while before exiting thread." , :exception => e )
283307 sleep 1
284308 return
285309 ensure
286310 redis . quit rescue nil
287311 end
288- @logger . debug ( "Exit worker for #{ key } " )
312+ @logger . warn ( "Exit worker for #{ key } " )
289313 end
290314
291315 # private
@@ -319,23 +343,22 @@ def ensure_workers(output_queue)
319343
320344 # private
321345 def list_pattern_runner ( output_queue )
346+ @list_pattern_processor = batched? ? method ( :list_pattern_batch_processor ) : method ( :list_pattern_single_processor )
322347 while !stop?
323348 init_threadpool if @threadpool . nil?
324349 ensure_workers ( output_queue )
325350 sleep ( @pattern_list_threadpool_sleep )
326351 end
327352 end
328353
329- def list_batch_listener ( redis , output_queue )
354+ def process_batch ( redis , output_queue , key , batch_size , sleep_time )
330355 begin
331- results = redis . evalsha ( @redis_script_sha , [ @ key] , [ @batch_count -1 ] )
356+ results = redis . evalsha ( @redis_script_sha , [ key ] , [ batch_size -1 ] )
332357 results . each do |item |
333358 queue_event ( item , output_queue )
334359 end
335-
336- if results . size . zero?
337- sleep BATCH_EMPTY_SLEEP
338- end
360+ sleep sleep_time if results . size . zero? && sleep_time > 0
361+ results . size
339362
340363 # Below is a commented-out implementation of 'batch fetch'
341364 # using pipelined LPOP calls. This in practice has been observed to
@@ -364,6 +387,10 @@ def list_batch_listener(redis, output_queue)
364387 end
365388 end
366389
390+ def list_batch_listener ( redis , output_queue )
391+ process_batch ( redis , output_queue , @key , @batch_count , BATCH_EMPTY_SLEEP )
392+ end
393+
367394 def list_single_listener ( redis , output_queue )
368395 item = redis . blpop ( @key , 0 , :timeout => 1 )
369396 return unless item # from timeout or other conditions
0 commit comments