@@ -22,10 +22,10 @@ class TimerSet
2222 # @option opts [object] :executor when provided will run all operations on
2323 # this executor rather than the global thread pool (overrides :operation)
2424 def initialize ( opts = { } )
25- @queue = PriorityQueue . new ( order : :min )
26- @task_executor = OptionsParser ::get_executor_from ( opts ) || Concurrent . configuration . global_task_pool
25+ @queue = PriorityQueue . new ( order : :min )
26+ @task_executor = OptionsParser ::get_executor_from ( opts ) || Concurrent . configuration . global_task_pool
2727 @timer_executor = SingleThreadExecutor . new
28- @condition = Condition . new
28+ @condition = Condition . new
2929 init_executor
3030 end
3131
@@ -64,7 +64,7 @@ def post(intended_time, *args, &task)
6464 # For a timer, #kill is like an orderly shutdown, except we need to manually
6565 # (and destructively) clear the queue first
6666 def kill
67- @queue . clear
67+ mutex . synchronize { @queue . clear }
6868 shutdown
6969 end
7070
@@ -124,14 +124,13 @@ def shutdown_execution
124124 # @!visibility private
125125 def process_tasks
126126 loop do
127- break if @queue . empty?
128-
129- task = @queue . peek
127+ task = mutex . synchronize { @queue . peek }
128+ break unless task
130129 interval = task . time - Time . now . to_f
131130
132131 if interval <= 0
133132 @task_executor . post ( *task . args , &task . op )
134- @queue . pop
133+ mutex . synchronize { @queue . pop }
135134 else
136135 mutex . synchronize do
137136 @condition . wait ( mutex , [ interval , 60 ] . min )
0 commit comments