11### Simple asynchronous task
22
33future = Concurrent . future { sleep 0.1 ; 1 + 1 } # evaluation starts immediately
4- # => <#Concurrent::Edge::Future:0x7f9f549266e8 pending blocks:[]>
4+ # => <#Concurrent::Edge::Future:0x7fa1b59bbbb8 pending blocks:[]>
55future . completed? # => false
66# block until evaluated
77future . value # => 2
1111### Failing asynchronous task
1212
1313future = Concurrent . future { raise 'Boom' }
14- # => <#Concurrent::Edge::Future:0x7f9f548dd3f8 pending blocks:[]>
14+ # => <#Concurrent::Edge::Future:0x7fa1b59b2fe0 failed blocks:[]>
1515future . value # => nil
1616future . value! rescue $! # => #<RuntimeError: Boom>
1717future . reason # => #<RuntimeError: Boom>
2323
2424head = Concurrent . future { 1 }
2525branch1 = head . then ( &:succ )
26- branch2 = head . then ( &:succ ) . then ( &:succ )
27- # zipping futures
26+ branch2 = head . then ( &:succ ) . then ( &:succ )
2827branch1 . zip ( branch2 ) . value # => [2, 3]
2928( branch1 & branch2 ) . then { |a , b | a + b } . value! # => 5
3029( branch1 & branch2 ) . then ( &:+ ) . value! # => 5
4847
4948# will not evaluate until asked by #value or other method requiring completion
5049future = Concurrent . delay { 'lazy' }
51- # => <#Concurrent::Edge::Future:0x7f9f53a57b58 pending blocks:[]>
50+ # => <#Concurrent::Edge::Future:0x7fa1b5948f00 pending blocks:[]>
5251sleep 0.1
5352future . completed? # => false
5453future . value # => "lazy"
5554
5655# propagates trough chain allowing whole or partial lazy chains
5756
5857head = Concurrent . delay { 1 }
59- # => <#Concurrent::Edge::Future:0x7f9f53a35508 pending blocks:[]>
58+ # => <#Concurrent::Edge::Future:0x7fa1b59401c0 pending blocks:[]>
6059branch1 = head . then ( &:succ )
61- # => <#Concurrent::Edge::Future:0x7f9f53a17328 pending blocks:[]>
60+ # => <#Concurrent::Edge::Future:0x7fa1b5933290 pending blocks:[]>
6261branch2 = head . delay . then ( &:succ )
63- # => <#Concurrent::Edge::Future:0x7f9f5590d868 pending blocks:[]>
62+ # => <#Concurrent::Edge::Future:0x7fa1b5930c98 pending blocks:[]>
6463join = branch1 & branch2
65- # => <#Concurrent::Edge::ArrayFuture:0x7f9f558fd7d8 pending blocks:[]>
64+ # => <#Concurrent::Edge::ArrayFuture:0x7fa1b592b7c0 pending blocks:[]>
6665
6766sleep 0.1 # nothing will complete # => 0
6867[ head , branch1 , branch2 , join ] . map ( &:completed? ) # => [false, false, false, false]
9089### Schedule
9190
9291scheduled = Concurrent . schedule ( 0.1 ) { 1 }
93- # => <#Concurrent::Edge::Future:0x7f9f5581ed30 pending blocks:[]>
92+ # => <#Concurrent::Edge::Future:0x7fa1b49df190 pending blocks:[]>
9493
9594scheduled . completed? # => false
9695scheduled . value # available after 0.1sec # => 1
9796
9897# and in chain
9998scheduled = Concurrent . delay { 1 } . schedule ( 0.1 ) . then ( &:succ )
100- # => <#Concurrent::Edge::Future:0x7f9f548494c8 pending blocks:[]>
99+ # => <#Concurrent::Edge::Future:0x7fa1b49d4ec0 pending blocks:[]>
101100# will not be scheduled until value is requested
102101sleep 0.1
103102scheduled . value # returns after another 0.1sec # => 2
106105### Completable Future and Event
107106
108107future = Concurrent . future
109- # => <#Concurrent::Edge::CompletableFuture:0x7f9f5385bcf0 pending blocks:[]>
108+ # => <#Concurrent::Edge::CompletableFuture:0x7fa1b49bce10 pending blocks:[]>
110109event = Concurrent . event
111- # => <#Concurrent::Edge::CompletableEvent:0x7f9f53858ca8 pending blocks:[]>
110+ # => <#Concurrent::Edge::CompletableEvent:0x7fa1b49bc118 pending blocks:[]>
112111
113112# will be blocked until completed
114113t1 = Thread . new { future . value }
115114t2 = Thread . new { event . wait }
116115
117116future . success 1
118- # => <#Concurrent::Edge::CompletableFuture:0x7f9f5385bcf0 success blocks:[]>
117+ # => <#Concurrent::Edge::CompletableFuture:0x7fa1b49bce10 success blocks:[]>
119118future . success 1 rescue $!
120119 # => #<Concurrent::MultipleAssignmentError: multiple assignment>
121120future . try_success 2 # => false
122121event . complete
123- # => <#Concurrent::Edge::CompletableEvent:0x7f9f53858ca8 completed blocks:[]>
122+ # => <#Concurrent::Edge::CompletableEvent:0x7fa1b49bc118 completed blocks:[]>
124123
125124[ t1 , t2 ] . each &:join
126125
127126
128127### Callbacks
129128
130- queue = Queue . new # => #<Thread::Queue:0x007f9f53861b50 >
129+ queue = Queue . new # => #<Thread::Queue:0x007fa1b49acec0 >
131130future = Concurrent . delay { 1 + 1 }
132- # => <#Concurrent::Edge::Future:0x7f9f54853d10 pending blocks:[]>
131+ # => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
133132
134133future . on_success { queue << 1 } # evaluated asynchronously
135- # => <#Concurrent::Edge::Future:0x7f9f54853d10 pending blocks:[]>
134+ # => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
136135future . on_success! { queue << 2 } # evaluated on completing thread
137- # => <#Concurrent::Edge::Future:0x7f9f54853d10 pending blocks:[]>
136+ # => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
138137
139138queue . empty? # => true
140139future . value # => 2
145144### Thread-pools
146145
147146Concurrent . future ( :fast ) { 2 } . then ( :io ) { File . read __FILE__ } . wait
148- # => <#Concurrent::Edge::Future:0x7f9f539545f8 success blocks:[]>
147+ # => <#Concurrent::Edge::Future:0x7fa1b498dd18 success blocks:[]>
149148
150149
151150### Interoperability with actors
166165
167166### Interoperability with channels
168167
169- ch1 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007f9f558e5db8 >
170- ch2 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007f9f558e4f80 >
168+ ch1 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fa1b41c5e28 >
169+ ch2 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fa1b41c5338 >
171170
172171result = Concurrent . select ( ch1 , ch2 )
173- # => <#Concurrent::Edge::CompletableFuture:0x7f9f53a4fe58 pending blocks:[]>
172+ # => <#Concurrent::Edge::CompletableFuture:0x7fa1b41bf4d8 pending blocks:[]>
174173ch1 . push 1 # => nil
175174result . value!
176- # => [1, #<Concurrent::Edge::Channel:0x007f9f558e5db8 >]
175+ # => [1, #<Concurrent::Edge::Channel:0x007fa1b41c5e28 >]
177176
178177Concurrent .
179178 future { 1 +1 } .
180179 then_push ( ch1 )
181- # => <#Concurrent::Edge::Future:0x7f9f53a2e758 pending blocks:[]>
180+ # => <#Concurrent::Edge::Future:0x7fa1b41b6450 pending blocks:[]>
182181result = Concurrent .
183182 future { '%02d' } .
184183 then_select ( ch1 , ch2 ) .
185184 then { |format , ( value , channel ) | format format , value }
186- # => <#Concurrent::Edge::Future:0x7f9f55925f30 pending blocks:[]>
185+ # => <#Concurrent::Edge::Future:0x7fa1b41a7f40 pending blocks:[]>
187186result . value! # => "02"
188187
189188
190189### Common use-cases Examples
191190
192191# simple background processing
193192Concurrent . future { do_stuff }
194- # => <#Concurrent::Edge::Future:0x7f9f53a93e50 pending blocks:[]>
193+ # => <#Concurrent::Edge::Future:0x7fa1b4196d08 pending blocks:[]>
195194
196195# parallel background processing
197196jobs = 10 . times . map { |i | Concurrent . future { i } }
@@ -208,7 +207,7 @@ def schedule_job
208207end # => :schedule_job
209208
210209schedule_job
211- # => <#Concurrent::Edge::Future:0x7f9f548b01a0 pending blocks:[]>
210+ # => <#Concurrent::Edge::Future:0x7fa1b4147960 pending blocks:[]>
212211@end = true # => true
213212
214213
@@ -238,24 +237,18 @@ def schedule_job
238237
239238
240239# In reality there is often a pool though:
241- class DBConnection < Concurrent ::Actor ::Utils ::AbstractWorker
242- def initialize ( balancer , data )
243- super balancer
244- @data = data
245- end
246-
247- def work ( message )
248- # pretending that this queries a DB
249- @data [ message ]
250- end
251- end # => :work
252-
253240data = Array . new ( 10 ) { |i | '*' * i }
254241 # => ["", "*", "**", "***", "****", "*****", "******", "*******", "********", "*********"]
255242pool_size = 5 # => 5
256243
257- DB_POOL = Concurrent ::Actor ::Utils ::Pool . spawn! ( 'DB-pool' , pool_size ) do |balancer , index |
258- DBConnection . spawn ( name : "worker-#{ index } " , args : [ balancer , data ] )
244+ DB_POOL = Concurrent ::Actor ::Utils ::Pool . spawn! ( 'DB-pool' , pool_size ) do |index |
245+ # DB connection constructor
246+ Concurrent ::Actor ::Utils ::AdHoc . spawn ( name : "worker-#{ index } " , args : [ data ] ) do
247+ lambda do |message |
248+ # pretending that this queries a DB
249+ data [ message ]
250+ end
251+ end
259252end
260253 # => #<Concurrent::Actor::Reference /DB-pool (Concurrent::Actor::Utils::Pool)>
261254
0 commit comments