@@ -267,8 +267,8 @@ def list_pattern_stop
267267 # private
268268 def worker_consume ( output_queue , key )
269269 @logger . debug ( "Launched worker for #{ key } " )
270- redis = new_redis_instance
271270 begin
271+ redis ||= connect
272272 ( 0 ...@pattern_list_max_items ) . each do
273273 if stop?
274274 @logger . debug ( "Breaking from thread #{ key } as it was requested to stop" )
@@ -278,6 +278,10 @@ def worker_consume(output_queue, key)
278278 break if value . nil?
279279 queue_event ( value , output_queue )
280280 end
281+ rescue ::Redis ::BaseError => e
282+ @logger . warn ( "Redis connection problem in thread for key #{ key } . Sleeping a while before exiting thread." , :exception => e )
283+ sleep 1
284+ return
281285 ensure
282286 redis . quit rescue nil
283287 end
@@ -304,31 +308,21 @@ def launch_worker(output_queue, key)
304308 # private
305309 def ensure_workers ( output_queue )
306310 return unless threadpool_capacity?
307- @redis . keys ( @key ) . shuffle . each do |key |
308- next if @current_workers . include? ( key )
309- launch_worker ( output_queue , key )
310- break unless threadpool_capacity?
311+ redis_runner do
312+ @redis . keys ( @key ) . shuffle . each do |key |
313+ next if @current_workers . include? ( key )
314+ launch_worker ( output_queue , key )
315+ break unless threadpool_capacity?
316+ end
311317 end
312318 end
313319
314- # private
315- def init_list_pattern_runner
316- init_threadpool
317- @redis ||= connect
318- end
319-
320320 # private
321321 def list_pattern_runner ( output_queue )
322322 while !stop?
323- begin
324- init_list_pattern_runner if @redis . nil?
325- ensure_workers ( output_queue )
326- sleep ( @pattern_list_threadpool_sleep )
327- rescue ::Redis ::BaseError => e
328- @logger . warn ( "Redis connection problem" , :exception => e )
329- @redis = nil
330- sleep 1
331- end
323+ init_threadpool if @threadpool . nil?
324+ ensure_workers ( output_queue )
325+ sleep ( @pattern_list_threadpool_sleep )
332326 end
333327 end
334328
0 commit comments