This repository was archived by the owner on Oct 15, 2025. It is now read-only.
File tree Expand file tree Collapse file tree 1 file changed +15
-10
lines changed Expand file tree Collapse file tree 1 file changed +15
-10
lines changed Original file line number Diff line number Diff line change @@ -260,11 +260,11 @@ def list_pattern_stop
260260 end
261261
262262 # private
263- def launch_worker ( output_queue , key )
263+ def worker_consume ( output_queue , key )
264264 @logger . debug ( "Launched worker for #{ key } " )
265265 redis = new_redis_instance
266266 begin
267- ( 0 ..@max_items_per_worker ) . each do
267+ ( 0 ..@max_items_per_worker - 1 ) . each do
268268 if stop?
269269 @logger . debug ( "Breaking from thread #{ key } as it was requested to stop" )
270270 break
@@ -285,21 +285,26 @@ def threadpool_capacity?
285285 @threadpool . remaining_capacity > 0
286286 end
287287
288+ # private
289+ def launch_worker ( output_queue , key )
290+ @current_workers . add ( key )
291+ @threadpool . post do
292+ begin
293+ worker_consume ( output_queue , key )
294+ ensure
295+ @current_workers . delete ( key )
296+ end
297+ end
298+ end
299+
288300 # private
289301 def ensure_workers ( output_queue )
290302 return unless threadpool_capacity?
291303 keys = @redis . keys ( @key )
292304 keys . shuffle
293305 keys . each do |key |
294306 next if @current_workers . include? ( key )
295- @current_workers . add ( key )
296- @threadpool . post do
297- begin
298- launch_worker ( output_queue , key )
299- ensure
300- @current_workers . delete ( key )
301- end
302- end
307+ launch_worker ( output_queue , key )
303308 break unless threadpool_capacity?
304309 end
305310 end
You can’t perform that action at this time.
0 commit comments