1- require 'thread'
21require 'concurrent/collection/copy_on_write_observer_set'
32require 'concurrent/concern/dereferenceable'
43require 'concurrent/concern/observable'
54require 'concurrent/concern/logging'
65require 'concurrent/executor/executor'
7- require 'concurrent/concern/deprecation '
6+ require 'concurrent/synchronization '
87
98module Concurrent
109
@@ -81,11 +80,10 @@ module Concurrent
8180 # @return [Fixnum] the maximum number of seconds before an update is cancelled
8281 #
8382 # @!macro edge_warning
84- class Agent
83+ class Agent < Synchronization :: Object
8584 include Concern ::Dereferenceable
8685 include Concern ::Observable
8786 include Concern ::Logging
88- include Concern ::Deprecation
8987
9088 attr_reader :timeout , :io_executor , :fast_executor
9189
@@ -95,15 +93,8 @@ class Agent
9593 #
9694 # @!macro executor_and_deref_options
9795 def initialize ( initial , opts = { } )
98- @value = initial
99- @rescuers = [ ]
100- @validator = Proc . new { |result | true }
101- self . observers = Collection ::CopyOnWriteObserverSet . new
102- @serialized_execution = SerializedExecution . new
103- @io_executor = Executor . executor_from_options ( opts ) || Concurrent . global_io_executor
104- @fast_executor = Executor . executor_from_options ( opts ) || Concurrent . global_fast_executor
105- init_mutex
106- set_deref_options ( opts )
96+ super ( )
97+ synchronize { ns_initialize ( initial , opts ) }
10798 end
10899
109100 # Specifies a block fast to be performed when an update fast raises
@@ -129,17 +120,15 @@ def initialize(initial, opts = {})
129120 # #=> puts "Pow!"
130121 def rescue ( clazz = StandardError , &block )
131122 unless block . nil?
132- mutex . synchronize do
133- @rescuers << Rescuer . new ( clazz , block )
134- end
123+ synchronize { @rescuers << Rescuer . new ( clazz , block ) }
135124 end
136125 self
137126 end
138127
139128 alias_method :catch , :rescue
140129 alias_method :on_error , :rescue
141130
142- # A block fast to be performed after every update to validate if the new
131+ # A block task to be performed after every update to validate if the new
143132 # value is valid. If the new value is not valid then the current value is not
144133 # updated. If no validator is provided then all updates are considered valid.
145134 #
@@ -150,12 +139,7 @@ def rescue(clazz = StandardError, &block)
150139 def validate ( &block )
151140
152141 unless block . nil?
153- begin
154- mutex . lock
155- @validator = block
156- ensure
157- mutex . unlock
158- end
142+ synchronize { @validator = block }
159143 end
160144 self
161145 end
@@ -179,30 +163,13 @@ def post(&block)
179163 # Update the current value with the result of the given block fast,
180164 # block can do blocking calls
181165 #
182- # @param [Fixnum, nil] timeout [DEPRECATED] maximum number of seconds before an update is cancelled
183- #
184166 # @yield the fast to be performed with the current value in order to calculate
185167 # the new value
186168 # @yieldparam [Object] value the current value
187169 # @yieldreturn [Object] the new value
188170 # @return [true, nil] nil when no block is given
189- def post_off ( timeout = nil , &block )
190- task = if timeout
191- deprecated 'post_off with option timeout options is deprecated and will be removed'
192- lambda do |value |
193- future = Future . execute do
194- block . call ( value )
195- end
196- if future . wait ( timeout )
197- future . value!
198- else
199- raise Concurrent ::TimeoutError
200- end
201- end
202- else
203- block
204- end
205- post_on ( @io_executor , &task )
171+ def post_off ( &block )
172+ post_on ( @io_executor , &block )
206173 end
207174
208175 # Update the current value with the result of the given block fast,
@@ -227,6 +194,20 @@ def await(timeout = nil)
227194 done . wait timeout
228195 end
229196
197+ protected
198+
199+ def ns_initialize ( initial , opts )
200+ init_mutex ( self )
201+ @value = initial
202+ @rescuers = [ ]
203+ @validator = Proc . new { |result | true }
204+ self . observers = Collection ::CopyOnWriteObserverSet . new
205+ @serialized_execution = SerializedExecution . new
206+ @io_executor = Executor . executor_from_options ( opts ) || Concurrent . global_io_executor
207+ @fast_executor = Executor . executor_from_options ( opts ) || Concurrent . global_fast_executor
208+ set_deref_options ( opts )
209+ end
210+
230211 private
231212
232213 def post_on ( executor , &block )
@@ -240,7 +221,7 @@ def post_on(executor, &block)
240221
241222 # @!visibility private
242223 def try_rescue ( ex ) # :nodoc:
243- rescuer = mutex . synchronize do
224+ rescuer = synchronize do
244225 @rescuers . find { |r | ex . is_a? ( r . clazz ) }
245226 end
246227 rescuer . block . call ( ex ) if rescuer
@@ -251,7 +232,7 @@ def try_rescue(ex) # :nodoc:
251232
252233 # @!visibility private
253234 def work ( &handler ) # :nodoc:
254- validator , value = mutex . synchronize { [ @validator , @value ] }
235+ validator , value = synchronize { [ @validator , @value ] }
255236
256237 begin
257238 result = handler . call ( value )
@@ -260,14 +241,11 @@ def work(&handler) # :nodoc:
260241 exception = ex
261242 end
262243
263- begin
264- mutex . lock
265- should_notify = if !exception && valid
266- @value = result
267- true
268- end
269- ensure
270- mutex . unlock
244+ should_notify = synchronize do
245+ if !exception && valid
246+ @value = result
247+ true
248+ end
271249 end
272250
273251 if should_notify
0 commit comments