Skip to content

Commit 9267c95

Browse files
bensheldoneregon
authored andcommitted
Allow TimerTask to be initialized with a specified Executor
The resulting ScheduledTasks will be initialized with that executor.
1 parent 53a67a1 commit 9267c95

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

lib/concurrent-ruby/concurrent/timer_task.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ class TimerTask < RubyExecutorService
164164
# @option opts [Boolean] :run_now Whether to run the task immediately
165165
# upon instantiation or to wait until the first # execution_interval
166166
# has passed (default: false)
167+
# @option opts [Executor] executor, default is `global_io_executor`
167168
#
168169
# @!macro deref_options
169170
#
@@ -268,7 +269,8 @@ def ns_initialize(opts, &task)
268269
warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly'
269270
end
270271
@run_now = opts[:now] || opts[:run_now]
271-
@executor = Concurrent::SafeTaskExecutor.new(task)
272+
@task = Concurrent::SafeTaskExecutor.new(task)
273+
@executor = opts[:executor] || Concurrent.global_io_executor
272274
@running = Concurrent::AtomicBoolean.new(false)
273275
@value = nil
274276

@@ -289,14 +291,14 @@ def ns_kill_execution
289291

290292
# @!visibility private
291293
def schedule_next_task(interval = execution_interval)
292-
ScheduledTask.execute(interval, args: [Concurrent::Event.new], &method(:execute_task))
294+
ScheduledTask.execute(interval, executor: @executor, args: [Concurrent::Event.new], &method(:execute_task))
293295
nil
294296
end
295297

296298
# @!visibility private
297299
def execute_task(completion)
298300
return nil unless @running.true?
299-
_success, value, reason = @executor.execute(self)
301+
_success, value, reason = @task.execute(self)
300302
if completion.try?
301303
self.value = value
302304
schedule_next_task

spec/concurrent/timer_task_spec.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def trigger_observable(observable)
8282
subject = TimerTask.new(execution_interval: 5) { nil }
8383
expect(subject.execution_interval).to eq 5
8484
end
85+
8586
end
8687

8788
context '#kill' do
@@ -181,6 +182,33 @@ def trigger_observable(observable)
181182
expect(latch.count).to eq(0)
182183
subject.kill
183184
end
185+
186+
it 'uses the global executor by default' do
187+
executor = Concurrent::ImmediateExecutor.new
188+
allow(Concurrent).to receive(:global_io_executor).and_return(executor)
189+
allow(executor).to receive(:post).and_call_original
190+
191+
latch = CountDownLatch.new(1)
192+
subject = TimerTask.new(execution_interval: 0.1, run_now: true) { latch.count_down }
193+
subject.execute
194+
expect(latch.wait(1)).to be_truthy
195+
subject.kill
196+
197+
expect(executor).to have_received(:post)
198+
end
199+
200+
it 'uses a custom executor when given' do
201+
executor = Concurrent::ImmediateExecutor.new
202+
allow(executor).to receive(:post).and_call_original
203+
204+
latch = CountDownLatch.new(1)
205+
subject = TimerTask.new(execution_interval: 0.1, run_now: true, executor: executor) { latch.count_down }
206+
subject.execute
207+
expect(latch.wait(1)).to be_truthy
208+
subject.kill
209+
210+
expect(executor).to have_received(:post)
211+
end
184212
end
185213

186214
context 'observation' do

0 commit comments

Comments
 (0)