@@ -19,13 +19,13 @@ module Concurrent
1919 # and no blocking receive. The state of an Agent should be itself immutable
2020 # and the `#value` of an Agent is always immediately available for reading by
2121 # any thread without any messages, i.e. observation does not require
22- # cooperation or coordination.
22+ # cooperation or coordination.
2323 #
2424 # Agent action dispatches are made using the various `#send` methods. These
2525 # methods always return immediately. At some point later, in another thread,
2626 # the following will happen:
2727 #
28- # 1. The given `action` will be applied to the state of the Agent and the
28+ # 1. The given `action` will be applied to the state of the Agent and the
2929 # `args`, if any were supplied.
3030 # 2. The return value of `action` will be passed to the validator lambda,
3131 # if one has been set on the Agent.
@@ -55,7 +55,7 @@ module Concurrent
5555 # Unlike in Clojure, `Agent` cannot participate in `Concurrent::TVar` transactions.
5656 #
5757 # ## Example
58- #
58+ #
5959 # ```
6060 # def next_fibonacci(set = nil)
6161 # return [0, 1] if set.nil?
@@ -130,7 +130,7 @@ module Concurrent
130130 # ```
131131 #
132132 # @!macro [new] agent_await_warning
133- #
133+ #
134134 # **NOTE** Never, *under any circumstances*, call any of the "await" methods
135135 # ({#await}, {#await_for}, {#await_for!}, and {#wait}) from within an action
136136 # block/proc/lambda. The call will block the Agent and will always fail.
@@ -141,7 +141,7 @@ module Concurrent
141141 #
142142 # @see http://clojure.org/Agents Clojure Agents
143143 # @see http://clojure.org/state Values and Change - Clojure's approach to Identity and State
144- class Agent < Synchronization ::Object
144+ class Agent < Synchronization ::LockableObject
145145 include Concern ::Observable
146146
147147 ERROR_MODES = [ :continue , :fail ] . freeze
@@ -150,13 +150,13 @@ class Agent < Synchronization::Object
150150 AWAIT_FLAG = Object . new
151151 private_constant :AWAIT_FLAG
152152
153- AWAIT_ACTION = -> ( value , latch ) { latch . count_down ; AWAIT_FLAG }
153+ AWAIT_ACTION = -> ( value , latch ) { latch . count_down ; AWAIT_FLAG }
154154 private_constant :AWAIT_ACTION
155155
156- DEFAULT_ERROR_HANDLER = -> ( agent , error ) { nil }
156+ DEFAULT_ERROR_HANDLER = -> ( agent , error ) { nil }
157157 private_constant :DEFAULT_ERROR_HANDLER
158158
159- DEFAULT_VALIDATOR = -> ( value ) { true }
159+ DEFAULT_VALIDATOR = -> ( value ) { true }
160160 private_constant :DEFAULT_VALIDATOR
161161
162162 Job = Struct . new ( :action , :args , :executor , :caller )
@@ -226,6 +226,7 @@ def initialize(initial, opts = {})
226226 def value
227227 @current . value
228228 end
229+
229230 alias_method :deref , :value
230231
231232 # When {#failed?} and {#error_mode} is `:fail`, returns the error object
@@ -236,6 +237,7 @@ def value
236237 def error
237238 @error . value
238239 end
240+
239241 alias_method :reason , :error
240242
241243 # @!macro [attach] agent_send
@@ -289,6 +291,7 @@ def send!(*args, &action)
289291 def send_off ( *args , &action )
290292 enqueue_action_job ( action , args , Concurrent . global_io_executor )
291293 end
294+
292295 alias_method :post , :send_off
293296
294297 # @!macro agent_send
@@ -396,6 +399,7 @@ def wait(timeout = nil)
396399 def failed?
397400 !@error . value . nil?
398401 end
402+
399403 alias_method :stopped? , :failed?
400404
401405 # When an Agent is {#failed?}, changes the Agent {#value} to `new_value`
@@ -420,7 +424,7 @@ def restart(new_value, opts = {})
420424 raise Error . new ( 'agent is not failed' ) unless failed?
421425 raise ValidationError unless ns_validate ( new_value )
422426 @current . value = new_value
423- @error . value = nil
427+ @error . value = nil
424428 @queue . clear if clear_actions
425429 ns_post_next_job unless @queue . empty?
426430 end
@@ -440,7 +444,7 @@ class << self
440444 #
441445 # @!macro agent_await_warning
442446 def await ( *agents )
443- agents . each { |agent | agent . await }
447+ agents . each { |agent | agent . await }
444448 true
445449 end
446450
@@ -455,11 +459,11 @@ def await(*agents)
455459 # @!macro agent_await_warning
456460 def await_for ( timeout , *agents )
457461 end_at = Concurrent . monotonic_time + timeout . to_f
458- ok = agents . length . times do |i |
462+ ok = agents . length . times do |i |
459463 break false if ( delay = end_at - Concurrent . monotonic_time ) < 0
460464 break false unless agents [ i ] . await_for ( delay )
461465 end
462- !! ok
466+ !!ok
463467 end
464468
465469 # Blocks the current thread until all actions dispatched thus far to all
@@ -481,7 +485,7 @@ def await_for!(timeout, *agents)
481485 private
482486
483487 def ns_initialize ( initial , opts )
484- @error_mode = opts [ :error_mode ]
488+ @error_mode = opts [ :error_mode ]
485489 @error_handler = opts [ :error_handler ]
486490
487491 if @error_mode && !ERROR_MODES . include? ( @error_mode )
@@ -491,11 +495,11 @@ def ns_initialize(initial, opts)
491495 end
492496
493497 @error_handler ||= DEFAULT_ERROR_HANDLER
494- @validator = opts . fetch ( :validator , DEFAULT_VALIDATOR )
495- @current = Concurrent ::AtomicReference . new ( initial )
496- @error = Concurrent ::AtomicReference . new ( nil )
497- @caller = Concurrent ::ThreadLocalVar . new ( nil )
498- @queue = [ ]
498+ @validator = opts . fetch ( :validator , DEFAULT_VALIDATOR )
499+ @current = Concurrent ::AtomicReference . new ( initial )
500+ @error = Concurrent ::AtomicReference . new ( nil )
501+ @caller = Concurrent ::ThreadLocalVar . new ( nil )
502+ @queue = [ ]
499503
500504 self . observers = Collection ::CopyOnNotifyObserverSet . new
501505 end
@@ -530,15 +534,15 @@ def ns_enqueue_job(job, index = nil)
530534 end
531535
532536 def ns_post_next_job
533- @queue . first . executor . post { execute_next_job }
537+ @queue . first . executor . post { execute_next_job }
534538 end
535539
536540 def execute_next_job
537- job = synchronize { @queue . first }
541+ job = synchronize { @queue . first }
538542 old_value = @current . value
539543
540544 @caller . value = job . caller # for nested actions
541- new_value = job . action . call ( old_value , *job . args )
545+ new_value = job . action . call ( old_value , *job . args )
542546 @caller . value = nil
543547
544548 if new_value != AWAIT_FLAG && ns_validate ( new_value )
@@ -573,7 +577,7 @@ def handle_error(error)
573577 end
574578
575579 def ns_find_last_job_for_thread
576- @queue . rindex { |job | job . caller == Thread . current . object_id }
580+ @queue . rindex { |job | job . caller == Thread . current . object_id }
577581 end
578582 end
579583end
0 commit comments