@@ -40,7 +40,7 @@ class Agent
4040 # is given at initialization
4141 TIMEOUT = 5
4242
43- attr_reader :timeout , :executor
43+ attr_reader :timeout , :task_executor , :operation_executor
4444
4545 # Initialize a new Agent with the given initial value and provided options.
4646 #
@@ -60,12 +60,14 @@ class Agent
6060 # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
6161 # returning the value returned from the proc
6262 def initialize ( initial , opts = { } )
63- @value = initial
64- @rescuers = [ ]
65- @validator = Proc . new { |result | true }
66- @timeout = opts . fetch ( :timeout , TIMEOUT ) . freeze
67- self . observers = CopyOnWriteObserverSet . new
68- @executor = OneByOne . new OptionsParser ::get_executor_from ( opts )
63+ @value = initial
64+ @rescuers = [ ]
65+ @validator = Proc . new { |result | true }
66+ @timeout = opts . fetch ( :timeout , TIMEOUT ) . freeze
67+ self . observers = CopyOnWriteObserverSet . new
68+ @one_by_one = OneByOne . new
69+ @task_executor = OptionsParser . get_task_executor_from ( opts )
70+ @operation_executor = OptionsParser . get_operation_executor_from ( opts )
6971 init_mutex
7072 set_deref_options ( opts )
7173 end
@@ -122,27 +124,39 @@ def validate(&block)
122124 alias_method :validate_with , :validate
123125 alias_method :validates_with , :validate
124126
125- # Update the current value with the result of the given block operation
127+ # Update the current value with the result of the given block operation,
128+ # block should not do blocking calls, use #post_off for blocking calls
126129 #
127130 # @yield the operation to be performed with the current value in order to calculate
128131 # the new value
129132 # @yieldparam [Object] value the current value
130133 # @yieldreturn [Object] the new value
131134 # @return [true, nil] nil when no block is given
132135 def post ( &block )
133- return nil if block . nil?
134- @executor . post { work ( &block ) }
135- true
136+ post_on ( @task_executor , &block )
137+ end
138+
139+ # Update the current value with the result of the given block operation,
140+ # block can do blocking calls
141+ #
142+ # @yield the operation to be performed with the current value in order to calculate
143+ # the new value
144+ # @yieldparam [Object] value the current value
145+ # @yieldreturn [Object] the new value
146+ # @return [true, nil] nil when no block is given
147+ def post_off ( &block )
148+ post_on ( @operation_executor , &block )
136149 end
137150
138- # Update the current value with the result of the given block operation
151+ # Update the current value with the result of the given block operation,
152+ # block should not do blocking calls, use #post_off for blocking calls
139153 #
140154 # @yield the operation to be performed with the current value in order to calculate
141155 # the new value
142156 # @yieldparam [Object] value the current value
143157 # @yieldreturn [Object] the new value
144158 def <<( block )
145- self . post ( &block )
159+ post ( &block )
146160 self
147161 end
148162
@@ -152,12 +166,18 @@ def <<(block)
152166 # @return [Boolean] false on timeout, true otherwise
153167 def await ( timeout = nil )
154168 done = Event . new
155- post { done . set }
169+ post { | val | done . set ; val }
156170 done . wait timeout
157171 end
158172
159173 private
160174
175+ def post_on ( executor , &block )
176+ return nil if block . nil?
177+ @one_by_one . post ( executor ) { work ( &block ) }
178+ true
179+ end
180+
161181 # @!visibility private
162182 Rescuer = Struct . new ( :clazz , :block ) # :nodoc:
163183
@@ -168,7 +188,6 @@ def try_rescue(ex) # :nodoc:
168188 end
169189 rescuer . block . call ( ex ) if rescuer
170190 rescue Exception => ex
171- # puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}"
172191 # supress
173192 end
174193
@@ -179,8 +198,8 @@ def work(&handler) # :nodoc:
179198 begin
180199 # FIXME creates second thread
181200 result , valid = Concurrent ::timeout ( @timeout ) do
182- [ result = handler . call ( value ) ,
183- validator . call ( result ) ]
201+ result = handler . call ( value )
202+ [ result , validator . call ( result ) ]
184203 end
185204 rescue Exception => ex
186205 exception = ex
0 commit comments