File tree Expand file tree Collapse file tree 3 files changed +20
-3
lines changed
lib/concurrent/actor/utils Expand file tree Collapse file tree 3 files changed +20
-3
lines changed Original file line number Diff line number Diff line change @@ -25,14 +25,15 @@ def on_message(message)
2525 when :subscribed?
2626 @receivers . include? envelope . sender
2727 else
28- @buffer << message
28+ @buffer << envelope
2929 distribute
30+ Behaviour ::MESSAGE_PROCESSED
3031 end
3132 end
3233
3334 def distribute
3435 while !@receivers . empty? && !@buffer . empty?
35- @receivers . shift << @buffer . shift
36+ redirect @receivers . shift , @buffer . shift
3637 end
3738 end
3839 end
Original file line number Diff line number Diff line change @@ -34,7 +34,7 @@ def initialize(size, &worker_initializer)
3434 end
3535
3636 def on_message ( message )
37- @balancer << message
37+ redirect @balancer
3838 end
3939 end
4040
Original file line number Diff line number Diff line change @@ -377,6 +377,22 @@ def on_message(message)
377377
378378 end
379379
380+ describe 'pool' do
381+ it 'supports asks' do
382+ worker = Class . new Concurrent ::Actor ::Utils ::AbstractWorker do
383+ def work ( message )
384+ 5 + message
385+ end
386+ end
387+
388+ pool = Concurrent ::Actor ::Utils ::Pool . spawn! 'pool' , 5 do |balancer , index |
389+ worker . spawn name : "worker-#{ index } " , supervise : true , args : [ balancer ]
390+ end
391+
392+ expect ( pool . ask! ( 5 ) ) . to eq 10
393+ terminate_actors pool
394+ end
395+ end
380396
381397 end
382398 end
You can’t perform that action at this time.
0 commit comments