99module Concurrent
1010
1111 # A very common currency pattern is to run a thread that performs a task at regular
12- # intervals. The thread that peforms the task sleeps for the given interval then
12+ # intervals. The thread that performs the task sleeps for the given interval then
1313 # wakes up and performs the task. Lather, rinse, repeat... This pattern causes two
14- # problems. First, it is difficult to test the business logic of the task becuse the
14+ # problems. First, it is difficult to test the business logic of the task because the
1515 # task itself is tightly coupled with the concurrency logic. Second, an exception in
1616 # raised while performing the task can cause the entire thread to abend. In a
1717 # long-running application where the task thread is intended to run for days/weeks/years
@@ -25,7 +25,7 @@ module Concurrent
2525 # performing logging or ancillary operations. +TimerTask+ can also be configured with a
2626 # timeout value allowing it to kill a task that runs too long.
2727 #
28- # One other advantage of +TimerTask+ is it forces the bsiness logic to be completely decoupled
28+ # One other advantage of +TimerTask+ is it forces the business logic to be completely decoupled
2929 # from the concurrency logic. The business logic can be tested separately then passed to the
3030 # +TimerTask+ for scheduling and running.
3131 #
@@ -147,7 +147,6 @@ class TimerTask
147147 include Dereferenceable
148148 include Runnable
149149 include Stoppable
150- include Observable
151150
152151 # Default +:execution_interval+
153152 EXECUTION_INTERVAL = 60
@@ -171,7 +170,7 @@ class TimerTask
171170 # @option opts [Integer] :timeout_interval number of seconds a task can
172171 # run before it is considered to have failed (default: TIMEOUT_INTERVAL)
173172 # @option opts [Boolean] :run_now Whether to run the task immediately
174- # upon instanciation or to wait until the first #execution_interval
173+ # upon instantiation or to wait until the first #execution_interval
175174 # has passed (default: false)
176175 #
177176 # @raise ArgumentError when no block is given.
@@ -193,9 +192,10 @@ def initialize(opts = {}, &block)
193192
194193 self . execution_interval = opts [ :execution ] || opts [ :execution_interval ] || EXECUTION_INTERVAL
195194 self . timeout_interval = opts [ :timeout ] || opts [ :timeout_interval ] || TIMEOUT_INTERVAL
196- @run_now = opts [ :now ] || opts [ :run_now ] || false
195+ @run_now = opts [ :now ] || opts [ :run_now ]
197196
198197 @task = block
198+ @observers = CopyOnWriteObserverSet . new
199199 init_mutex
200200 set_deref_options ( opts )
201201 end
@@ -226,6 +226,10 @@ def timeout_interval=(value)
226226 @timeout_interval = value
227227 end
228228
229+ def add_observer ( observer , func = :update )
230+ @observers . add_observer ( observer , func )
231+ end
232+
229233 # Terminate with extreme prejudice. Useful in cases where +#stop+ doesn't
230234 # work because one of the threads becomes unresponsive.
231235 #
@@ -278,11 +282,10 @@ def execute_task # :nodoc:
278282 end
279283 raise TimeoutError if @worker . join ( @timeout_interval ) . nil?
280284 mutex . synchronize { @value = @worker [ :result ] }
281- rescue Exception => ex
282- # suppress
285+ rescue Exception => e
286+ ex = e
283287 ensure
284- changed
285- notify_observers ( Time . now , self . value , ex )
288+ @observers . notify_observers ( Time . now , self . value , ex )
286289 unless @worker . nil?
287290 Thread . kill ( @worker )
288291 @worker = nil
0 commit comments