File tree Expand file tree Collapse file tree 2 files changed +62
-6
lines changed Expand file tree Collapse file tree 2 files changed +62
-6
lines changed Original file line number Diff line number Diff line change 11require 'thread'
22
33require 'concurrent/actor_ref'
4+ require 'concurrent/event'
45require 'concurrent/ivar'
56
67module Concurrent
@@ -13,25 +14,25 @@ def initialize(actor, opts = {})
1314 @mutex = Mutex . new
1415 @queue = Queue . new
1516 @thread = nil
16- @stopped = false
17+ @stop_event = Event . new
1718 @abort_on_exception = opts . fetch ( :abort_on_exception , true )
1819 @reset_on_error = opts . fetch ( :reset_on_error , true )
1920 @exception_class = opts . fetch ( :rescue_exception , false ) ? Exception : StandardError
2021 @observers = CopyOnNotifyObserverSet . new
2122 end
2223
2324 def running?
24- @mutex . synchronize { @stopped == false }
25+ ! @stop_event . set?
2526 end
2627
2728 def shutdown?
28- @mutex . synchronize { @stopped == true }
29+ @stop_event . set?
2930 end
3031
3132 def post ( *msg , &block )
3233 raise ArgumentError . new ( 'message cannot be empty' ) if msg . empty?
3334 @mutex . synchronize do
34- supervise unless @stopped == true
35+ supervise unless shutdown?
3536 end
3637 ivar = IVar . new
3738 @queue . push ( Message . new ( msg , ivar , block ) )
@@ -52,15 +53,19 @@ def post!(seconds, *msg)
5253
5354 def shutdown
5455 @mutex . synchronize do
55- return if @stopped
56- @stopped = true
56+ return if shutdown?
5757 if @thread && @thread . alive?
5858 @thread . kill
5959 @actor . on_shutdown
6060 end
61+ @stop_event . set
6162 end
6263 end
6364
65+ def join ( timeout = nil )
66+ @stop_event . wait ( timeout )
67+ end
68+
6469 private
6570
6671 Message = Struct . new ( :payload , :ivar , :callback )
Original file line number Diff line number Diff line change @@ -169,6 +169,57 @@ def receive(*msg)
169169 end
170170 end
171171
172+ context '#join' do
173+
174+ it 'blocks until shutdown when no limit is given' do
175+ start = Time . now
176+ subject << :foo # start the actor's thread
177+ Thread . new { sleep ( 1 ) ; subject . shutdown }
178+ subject . join
179+ stop = Time . now
180+
181+ subject . should be_shutdown
182+ stop . should >= start + 1
183+ stop . should <= start + 2
184+ end
185+
186+ it 'blocks for no more than the given number of seconds' do
187+ start = Time . now
188+ subject << :foo # start the actor's thread
189+ Thread . new { sleep ( 5 ) ; subject . shutdown }
190+ subject . join ( 1 )
191+ stop = Time . now
192+
193+ stop . should >= start + 1
194+ stop . should <= start + 2
195+ end
196+
197+ it 'returns true when shutdown has completed before timeout' do
198+ subject << :foo # start the actor's thread
199+ Thread . new { sleep ( 1 ) ; subject . shutdown }
200+ subject . join . should be_true
201+ end
202+
203+ it 'returns false on timeout' do
204+ subject << :foo # start the actor's thread
205+ Thread . new { sleep ( 5 ) ; subject . shutdown }
206+ subject . join ( 1 ) . should be_false
207+ end
208+
209+ it 'returns immediately when already shutdown' do
210+ start = Time . now
211+ subject << :foo # start the actor's thread
212+ sleep ( 0.1 )
213+ subject . shutdown
214+ sleep ( 0.1 )
215+
216+ start = Time . now
217+ subject . join
218+ Time . now . should >= start
219+ Time . now . should <= start + 0.1
220+ end
221+ end
222+
172223 context 'observation' do
173224
174225 let ( :observer_class ) do
You can’t perform that action at this time.
0 commit comments