Skip to content

Commit 18f96b7

Browse files
authored
Merge pull request #840 from fzakaria/ISSUE-839
Introduce DaemonThreadFactory
2 parents f749b81 + 86bf86c commit 18f96b7

File tree

6 files changed

+67
-11
lines changed

6 files changed

+67
-11
lines changed

lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,13 @@ def initialize(opts = {})
5151
def ns_initialize(opts)
5252
super(opts)
5353
if Concurrent.on_jruby?
54+
self.auto_terminate = opts.fetch(:auto_terminate, true)
5455
@max_queue = 0
55-
@executor = java.util.concurrent.Executors.newCachedThreadPool
56+
@executor = java.util.concurrent.Executors.newCachedThreadPool(
57+
Concurrent::DaemonThreadFactory.new(self.auto_terminate?)
58+
)
5659
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
5760
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
58-
self.auto_terminate = opts.fetch(:auto_terminate, true)
5961
end
6062
end
6163
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
module Concurrent
2+
3+
class DaemonThreadFactory
4+
include java.util.concurrent.ThreadFactory
5+
6+
def initialize(daemonize = true)
7+
@daemonize = daemonize
8+
end
9+
10+
def newThread(runnable)
11+
thread = java.util.concurrent.Executors.defaultThreadFactory().newThread(runnable)
12+
thread.setDaemon(@daemonize)
13+
return thread
14+
end
15+
end
16+
17+
end

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,17 @@ def ns_initialize(opts)
108108
queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
109109
end
110110

111+
self.auto_terminate = opts.fetch(:auto_terminate, true)
112+
111113
@executor = java.util.concurrent.ThreadPoolExecutor.new(
112114
min_length,
113115
max_length,
114116
idletime,
115117
java.util.concurrent.TimeUnit::SECONDS,
116118
queue,
119+
Concurrent::DaemonThreadFactory.new(self.auto_terminate?),
117120
FALLBACK_POLICY_CLASSES[@fallback_policy].new)
118121

119-
self.auto_terminate = opts.fetch(:auto_terminate, true)
120122
end
121123
end
122124
end

lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ module Concurrent
55

66
if Concurrent.on_jruby?
77
require 'concurrent/executor/java_thread_pool_executor'
8+
require 'concurrent/executor/java_daemon_thread_factory'
89
end
910

1011
ThreadPoolExecutorImplementation = case

spec/concurrent/executor/cached_thread_pool_spec.rb

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@ module Concurrent
4545
end
4646

4747
it 'returns zero while running' do
48-
10.times{ subject.post{ nil } }
48+
10.times { subject.post { nil } }
4949
subject.post { latch.count_down }
5050
latch.wait(0.1)
5151
expect(subject.min_length).to eq 0
5252
end
5353

5454
it 'returns zero once shutdown' do
55-
10.times{ subject.post{ nil } }
55+
10.times { subject.post { nil } }
5656
subject.post { latch.count_down }
5757
latch.wait(0.1)
5858
subject.shutdown
@@ -68,14 +68,14 @@ module Concurrent
6868
end
6969

7070
it 'returns :max_length while running' do
71-
10.times{ subject.post{ nil } }
71+
10.times { subject.post { nil } }
7272
subject.post { latch.count_down }
7373
latch.wait(0.1)
7474
expect(subject.max_length).to eq described_class::DEFAULT_MAX_POOL_SIZE
7575
end
7676

7777
it 'returns :max_length once shutdown' do
78-
10.times{ subject.post{ nil } }
78+
10.times { subject.post { nil } }
7979
subject.post { latch.count_down }
8080
latch.wait(0.1)
8181
subject.shutdown
@@ -91,14 +91,14 @@ module Concurrent
9191
end
9292

9393
it 'returns a non-zero number once tasks have been received' do
94-
10.times{ subject.post{ sleep(0.1) } }
94+
10.times { subject.post { sleep(0.1) } }
9595
subject.post { latch.count_down }
9696
latch.wait(0.1)
9797
expect(subject.largest_length).to be > 0
9898
end
9999

100100
it 'returns a non-zero number after shutdown if tasks have been received' do
101-
10.times{ subject.post{ sleep(0.1) } }
101+
10.times { subject.post { sleep(0.1) } }
102102
subject.post { latch.count_down }
103103
latch.wait(0.1)
104104
subject.shutdown
@@ -109,7 +109,7 @@ module Concurrent
109109

110110
context '#idletime' do
111111

112-
subject{ described_class.new(idletime: 42) }
112+
subject { described_class.new(idletime: 42) }
113113

114114
it 'returns the thread idletime' do
115115
expect(subject.idletime).to eq 42
@@ -123,7 +123,7 @@ module Concurrent
123123
context '#initialize' do
124124

125125
it 'sets :fallback_policy correctly' do
126-
clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
126+
clazz = java.util.concurrent.ThreadPoolExecutor::DiscardPolicy
127127
policy = clazz.new
128128
expect(clazz).to receive(:new).at_least(:once).with(any_args).and_return(policy)
129129

@@ -201,6 +201,26 @@ module Concurrent
201201
end
202202
end
203203

204+
context 'auto terminate' do
205+
206+
# https://github.com/ruby-concurrency/concurrent-ruby/issues/817
207+
# https://github.com/ruby-concurrency/concurrent-ruby/issues/839
208+
it 'does not stop shutdown ' do
209+
Timeout.timeout(10) do
210+
begin
211+
test_file = File.join File.dirname(__FILE__), 'pool_quits.rb'
212+
pid = spawn RbConfig.ruby, test_file
213+
Process.waitpid pid
214+
expect($?.success?).to eq true
215+
rescue Timeout::Error => e
216+
Process.kill :KILL, pid
217+
raise e
218+
end
219+
end
220+
end
221+
222+
end
223+
204224
context 'stress', notravis: true do
205225
configurations = [
206226
{ min_threads: 2,
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
2+
lib = File.expand_path '../../../lib/concurrent-ruby/'
3+
$LOAD_PATH.push lib unless $LOAD_PATH.include? lib
4+
5+
require 'concurrent-ruby'
6+
7+
# the test relies on replicating that Minitest messed up the AtExit handling
8+
Concurrent.disable_at_exit_handlers!
9+
pool = Concurrent::CachedThreadPool.new
10+
pool.post do
11+
sleep # sleep indefinitely
12+
end
13+
14+
# the process main thread should quit out which should kill the daemon CachedThreadPool

0 commit comments

Comments
 (0)