55module Concurrent
66
77 module Executor
8-
8+ # The policy defining how rejected tasks (tasks received once the queue size reaches
9+ # the configured `max_queue`) are handled. Must be one of the values specified in
10+ # `OVERFLOW_POLICIES`.
11+ attr_reader :overflow_policy
12+
913 # @!macro [attach] executor_module_method_can_overflow_question
1014 #
1115 # Does the task queue have a maximum size?
@@ -17,6 +21,31 @@ def can_overflow?
1721 false
1822 end
1923
24+ # Handler which executes the `overflow_policy` once the queue size
25+ # reaches `max_queue`.
26+ #
27+ # @param [Array] args the arguments to the task which is being handled.
28+ #
29+ # @!visibility private
30+ def handle_overflow ( *args )
31+ case @overflow_policy
32+ when :abort
33+ raise RejectedExecutionError
34+ when :discard
35+ false
36+ when :caller_runs
37+ begin
38+ yield ( *args )
39+ rescue => ex
40+ # let it fail
41+ log DEBUG , ex
42+ end
43+ true
44+ else
45+ fail "Unknown overflow policy #{ @overflow_policy } "
46+ end
47+ end
48+
2049 # @!macro [attach] executor_module_method_serialized_question
2150 #
2251 # Does this executor guarantee serialization of its operations?
@@ -63,6 +92,9 @@ module RubyExecutor
6392 include Executor
6493 include Logging
6594
95+ # The set of possible overflow policies that may be set at thread pool creation.
96+ OVERFLOW_POLICIES = [ :abort , :discard , :caller_runs ]
97+
6698 # @!macro [attach] executor_method_post
6799 #
68100 # Submit a task to the executor for asynchronous processing.
@@ -78,16 +110,8 @@ module RubyExecutor
78110 def post ( *args , &task )
79111 raise ArgumentError . new ( 'no block given' ) unless block_given?
80112 mutex . synchronize do
81- unless running?
82- # The executor is shut down - figure out how to reject this task
83- if self . respond_to? ( :handle_overflow , true )
84- # Reject this task in the same way we'd reject an overflow
85- return handle_overflow ( *args , &task )
86- else
87- # No handle_overflow method defined - just return false
88- return false
89- end
90- end
113+ # If the executor is shut down, reject this task
114+ return handle_overflow ( *args , &task ) unless running?
91115 execute ( *args , &task )
92116 true
93117 end
@@ -219,16 +243,20 @@ module JavaExecutor
219243 include Executor
220244 java_import 'java.lang.Runnable'
221245
246+ # The set of possible overflow policies that may be set at thread pool creation.
247+ OVERFLOW_POLICIES = {
248+ abort : java . util . concurrent . ThreadPoolExecutor ::AbortPolicy ,
249+ discard : java . util . concurrent . ThreadPoolExecutor ::DiscardPolicy ,
250+ caller_runs : java . util . concurrent . ThreadPoolExecutor ::CallerRunsPolicy
251+ } . freeze
252+
222253 # @!macro executor_method_post
223254 def post ( *args )
224255 raise ArgumentError . new ( 'no block given' ) unless block_given?
225- if running?
226- executor_submit = @executor . java_method ( :submit , [ Runnable . java_class ] )
227- executor_submit . call { yield ( *args ) }
228- true
229- else
230- false
231- end
256+ return handle_overflow ( *args , &task ) unless running?
257+ executor_submit = @executor . java_method ( :submit , [ Runnable . java_class ] )
258+ executor_submit . call { yield ( *args ) }
259+ true
232260 rescue Java ::JavaUtilConcurrent ::RejectedExecutionException
233261 raise RejectedExecutionError
234262 end
0 commit comments