@@ -40,7 +40,7 @@ class Agent
4040 # is given at initialization
4141 TIMEOUT = 5
4242
43- attr_reader :timeout
43+ attr_reader :timeout , :executor
4444
4545 # Initialize a new Agent with the given initial value and provided options.
4646 #
@@ -60,12 +60,12 @@ class Agent
6060 # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
6161 # returning the value returned from the proc
6262 def initialize ( initial , opts = { } )
63- @value = initial
64- @rescuers = [ ]
65- @validator = Proc . new { |result | true }
66- @timeout = opts . fetch ( :timeout , TIMEOUT ) . freeze
63+ @value = initial
64+ @rescuers = [ ]
65+ @validator = Proc . new { |result | true }
66+ @timeout = opts . fetch ( :timeout , TIMEOUT ) . freeze
6767 self . observers = CopyOnWriteObserverSet . new
68- @executor = OptionsParser ::get_executor_from ( opts )
68+ @executor = OneByOne . new OptionsParser ::get_executor_from ( opts )
6969 init_mutex
7070 set_deref_options ( opts )
7171 end
@@ -111,7 +111,11 @@ def rescue(clazz = StandardError, &block)
111111 # @yieldparam [Object] value the result of the last update operation
112112 # @yieldreturn [Boolean] true if the value is valid else false
113113 def validate ( &block )
114- @validator = block unless block . nil?
114+ unless block . nil?
115+ mutex . lock
116+ @validator = block
117+ mutex . unlock
118+ end
115119 self
116120 end
117121 alias_method :validates , :validate
@@ -124,8 +128,11 @@ def validate(&block)
124128 # the new value
125129 # @yieldparam [Object] value the current value
126130 # @yieldreturn [Object] the new value
131+ # @return [true, nil] nil when no block is given
127132 def post ( &block )
128- @executor . post { work ( &block ) } unless block . nil?
133+ return nil if block . nil?
134+ @executor . post { work ( &block ) }
135+ true
129136 end
130137
131138 # Update the current value with the result of the given block operation
@@ -139,6 +146,16 @@ def <<(block)
139146 self
140147 end
141148
149+ # Waits/blocks until all the updates sent before this call are done.
150+ #
151+ # @param [Numeric] timeout the maximum time in second to wait.
152+ # @return [Boolean] false on timeout, true otherwise
153+ def await ( timeout = nil )
154+ done = Event . new
155+ post { done . set }
156+ done . wait timeout
157+ end
158+
142159 private
143160
144161 # @!visibility private
@@ -147,33 +164,41 @@ def <<(block)
147164 # @!visibility private
148165 def try_rescue ( ex ) # :nodoc:
149166 rescuer = mutex . synchronize do
150- @rescuers . find { |r | ex . is_a? ( r . clazz ) }
167+ @rescuers . find { |r | ex . is_a? ( r . clazz ) }
151168 end
152169 rescuer . block . call ( ex ) if rescuer
153170 rescue Exception => ex
171+ # puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}"
154172 # supress
155173 end
156174
157175 # @!visibility private
158176 def work ( &handler ) # :nodoc:
177+ validator , value = mutex . synchronize { [ @validator , @value ] }
178+
159179 begin
180+ # FIXME creates second thread
181+ result , valid = Concurrent ::timeout ( @timeout ) do
182+ [ result = handler . call ( value ) ,
183+ validator . call ( result ) ]
184+ end
185+ rescue Exception => ex
186+ exception = ex
187+ end
160188
161- should_notify = false
189+ mutex . lock
190+ should_notify = if !exception && valid
191+ @value = result
192+ true
193+ end
194+ mutex . unlock
162195
163- mutex . synchronize do
164- result = Concurrent ::timeout ( @timeout ) do
165- handler . call ( @value )
166- end
167- if @validator . call ( result )
168- @value = result
169- should_notify = true
170- end
171- end
196+ if should_notify
172197 time = Time . now
173- observers . notify_observers { [ time , self . value ] } if should_notify
174- rescue Exception => ex
175- try_rescue ( ex )
198+ observers . notify_observers { [ time , self . value ] }
176199 end
200+
201+ try_rescue ( exception )
177202 end
178203 end
179204end
0 commit comments