@@ -11,20 +11,20 @@ class RubyThreadPoolExecutor
1111 include RubyExecutor
1212
1313 # Default maximum number of threads that will be created in the pool.
14- DEFAULT_MAX_POOL_SIZE = 2 **15 # 32768
14+ DEFAULT_MAX_POOL_SIZE = 2 **15 # 32768
1515
1616 # Default minimum number of threads that will be retained in the pool.
17- DEFAULT_MIN_POOL_SIZE = 0
17+ DEFAULT_MIN_POOL_SIZE = 0
1818
1919 # Default maximum number of tasks that may be added to the task queue.
20- DEFAULT_MAX_QUEUE_SIZE = 0
20+ DEFAULT_MAX_QUEUE_SIZE = 0
2121
2222 # Default maximum number of seconds a thread in the pool may remain idle
2323 # before being reclaimed.
2424 DEFAULT_THREAD_IDLETIMEOUT = 60
2525
2626 # The set of possible overflow policies that may be set at thread pool creation.
27- OVERFLOW_POLICIES = [ :abort , :discard , :caller_runs ]
27+ OVERFLOW_POLICIES = [ :abort , :discard , :caller_runs ]
2828
2929 # The maximum number of threads that may be created in the pool.
3030 attr_reader :max_length
@@ -77,10 +77,10 @@ class RubyThreadPoolExecutor
7777 #
7878 # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
7979 def initialize ( opts = { } )
80- @min_length = opts . fetch ( :min_threads , DEFAULT_MIN_POOL_SIZE ) . to_i
81- @max_length = opts . fetch ( :max_threads , DEFAULT_MAX_POOL_SIZE ) . to_i
82- @idletime = opts . fetch ( :idletime , DEFAULT_THREAD_IDLETIMEOUT ) . to_i
83- @max_queue = opts . fetch ( :max_queue , DEFAULT_MAX_QUEUE_SIZE ) . to_i
80+ @min_length = opts . fetch ( :min_threads , DEFAULT_MIN_POOL_SIZE ) . to_i
81+ @max_length = opts . fetch ( :max_threads , DEFAULT_MAX_POOL_SIZE ) . to_i
82+ @idletime = opts . fetch ( :idletime , DEFAULT_THREAD_IDLETIMEOUT ) . to_i
83+ @max_queue = opts . fetch ( :max_queue , DEFAULT_MAX_QUEUE_SIZE ) . to_i
8484 @overflow_policy = opts . fetch ( :overflow_policy , :abort )
8585
8686 raise ArgumentError . new ( 'max_threads must be greater than zero' ) if @max_length <= 0
@@ -90,13 +90,13 @@ def initialize(opts = {})
9090
9191 init_executor
9292
93- @pool = [ ]
94- @queue = Queue . new
93+ @pool = [ ]
94+ @queue = Queue . new
9595 @scheduled_task_count = 0
9696 @completed_task_count = 0
97- @largest_length = 0
97+ @largest_length = 0
9898
99- @gc_interval = opts . fetch ( :gc_interval , 1 ) . to_i # undocumented
99+ @gc_interval = opts . fetch ( :gc_interval , 1 ) . to_i # undocumented
100100 @last_gc_time = Time . now . to_f - [ 1.0 , ( @gc_interval * 2.0 ) ] . max
101101 end
102102
@@ -109,15 +109,16 @@ def can_overflow?
109109 #
110110 # @return [Integer] the length
111111 def length
112- mutex . synchronize { running? ? @pool . length : 0 }
112+ mutex . synchronize { running? ? @pool . length : 0 }
113113 end
114+
114115 alias_method :current_length , :length
115116
116117 # The number of tasks in the queue awaiting execution.
117118 #
118119 # @return [Integer] the queue_length
119120 def queue_length
120- mutex . synchronize { running? ? @queue . length : 0 }
121+ mutex . synchronize { running? ? @queue . length : 0 }
121122 end
122123
123124 # Number of tasks that may be enqueued before reaching `max_queue` and rejecting
@@ -152,7 +153,7 @@ def on_end_task
152153 def on_worker_exit ( worker )
153154 mutex . synchronize do
154155 @pool . delete ( worker )
155- if @pool . empty? && ! running?
156+ if @pool . empty? && !running?
156157 stop_event . set
157158 stopped_event . set
158159 end
@@ -177,7 +178,7 @@ def shutdown_execution
177178 if @pool . empty?
178179 stopped_event . set
179180 else
180- @pool . length . times { @queue << :stop }
181+ @pool . length . times { @queue << :stop }
181182 end
182183 end
183184
@@ -196,7 +197,7 @@ def kill_execution
196197 # @!visibility private
197198 def ensure_capacity?
198199 additional = 0
199- capacity = true
200+ capacity = true
200201
201202 if @pool . size < @min_length
202203 additional = @min_length - @pool . size
@@ -254,10 +255,11 @@ def handle_overflow(*args)
254255 # @!visibility private
255256 def prune_pool
256257 if Time . now . to_f - @gc_interval >= @last_gc_time
257- @pool . delete_if do |worker |
258- worker . dead? ||
259- ( @idletime == 0 ? false : Time . now . to_f - @idletime > worker . last_activity )
260- end
258+ @pool . delete_if { |worker | worker . dead? }
259+ # send :stop for each thread over idletime
260+ @pool .
261+ select { |worker | @idletime != 0 && Time . now . to_f - @idletime > worker . last_activity } .
262+ each { @queue << :stop }
261263 @last_gc_time = Time . now . to_f
262264 end
263265 end
@@ -266,7 +268,7 @@ def prune_pool
266268 #
267269 # @!visibility private
268270 def drain_pool
269- @pool . each { |worker | worker . kill }
271+ @pool . each { |worker | worker . kill }
270272 @pool . clear
271273 end
272274
0 commit comments