11### Simple asynchronous task
22
33future = Concurrent . future { sleep 0.1 ; 1 + 1 } # evaluation starts immediately
4- # => <#Concurrent::Edge::Future:0x7fa1b59bbbb8 pending blocks:[]>
4+ # => <#Concurrent::Edge::Future:0x7fcc038ca588 pending blocks:[]>
55future . completed? # => false
66# block until evaluated
77future . value # => 2
1111### Failing asynchronous task
1212
1313future = Concurrent . future { raise 'Boom' }
14- # => <#Concurrent::Edge::Future:0x7fa1b59b2fe0 failed blocks:[]>
14+ # => <#Concurrent::Edge::Future:0x7fcc038c05b0 pending blocks:[]>
1515future . value # => nil
1616future . value! rescue $! # => #<RuntimeError: Boom>
1717future . reason # => #<RuntimeError: Boom>
2424head = Concurrent . future { 1 }
2525branch1 = head . then ( &:succ )
2626branch2 = head . then ( &:succ ) . then ( &:succ )
27- branch1 . zip ( branch2 ) . value # => [2, 3]
27+ branch1 . zip ( branch2 ) . value! # => [2, 3]
2828( branch1 & branch2 ) . then { |a , b | a + b } . value! # => 5
2929( branch1 & branch2 ) . then ( &:+ ) . value! # => 5
3030Concurrent . zip ( branch1 , branch2 , branch1 ) . then { |*values | values . reduce &:+ } . value!
3131 # => 7
3232# pick only first completed
33- ( branch1 | branch2 ) . value # => 2
33+ ( branch1 | branch2 ) . value! # => 2
34+
35+ # auto splat arrays for blocks
36+ Concurrent . future { [ 1 , 2 ] } . then ( &:+ ) . value! # => 3
3437
3538
3639### Error handling
4750
4851# will not evaluate until asked by #value or other method requiring completion
4952future = Concurrent . delay { 'lazy' }
50- # => <#Concurrent::Edge::Future:0x7fa1b5948f00 pending blocks:[]>
53+ # => <#Concurrent::Edge::Future:0x7fcc022afaa0 pending blocks:[]>
5154sleep 0.1
5255future . completed? # => false
5356future . value # => "lazy"
5457
5558# propagates trough chain allowing whole or partial lazy chains
5659
5760head = Concurrent . delay { 1 }
58- # => <#Concurrent::Edge::Future:0x7fa1b59401c0 pending blocks:[]>
61+ # => <#Concurrent::Edge::Future:0x7fcc022a5640 pending blocks:[]>
5962branch1 = head . then ( &:succ )
60- # => <#Concurrent::Edge::Future:0x7fa1b5933290 pending blocks:[]>
63+ # => <#Concurrent::Edge::Future:0x7fcc0229c478 pending blocks:[]>
6164branch2 = head . delay . then ( &:succ )
62- # => <#Concurrent::Edge::Future:0x7fa1b5930c98 pending blocks:[]>
65+ # => <#Concurrent::Edge::Future:0x7fcc0228f318 pending blocks:[]>
6366join = branch1 & branch2
64- # => <#Concurrent::Edge::ArrayFuture:0x7fa1b592b7c0 pending blocks:[]>
67+ # => <#Concurrent::Edge::Future:0x7fcc0228de78 pending blocks:[]>
6568
6669sleep 0.1 # nothing will complete # => 0
6770[ head , branch1 , branch2 , join ] . map ( &:completed? ) # => [false, false, false, false]
8992### Schedule
9093
9194scheduled = Concurrent . schedule ( 0.1 ) { 1 }
92- # => <#Concurrent::Edge::Future:0x7fa1b49df190 pending blocks:[]>
95+ # => <#Concurrent::Edge::Future:0x7fcc0385b368 pending blocks:[]>
9396
9497scheduled . completed? # => false
9598scheduled . value # available after 0.1sec # => 1
9699
97100# and in chain
98101scheduled = Concurrent . delay { 1 } . schedule ( 0.1 ) . then ( &:succ )
99- # => <#Concurrent::Edge::Future:0x7fa1b49d4ec0 pending blocks:[]>
102+ # => <#Concurrent::Edge::Future:0x7fcc03843948 pending blocks:[]>
100103# will not be scheduled until value is requested
101104sleep 0.1
102105scheduled . value # returns after another 0.1sec # => 2
105108### Completable Future and Event
106109
107110future = Concurrent . future
108- # => <#Concurrent::Edge::CompletableFuture:0x7fa1b49bce10 pending blocks:[]>
111+ # => <#Concurrent::Edge::CompletableFuture:0x7fcc0304fdb8 pending blocks:[]>
109112event = Concurrent . event
110- # => <#Concurrent::Edge::CompletableEvent:0x7fa1b49bc118 pending blocks:[]>
113+ # => <#Concurrent::Edge::CompletableEvent:0x7fcc0304e210 pending blocks:[]>
111114
112115# will be blocked until completed
113116t1 = Thread . new { future . value }
114117t2 = Thread . new { event . wait }
115118
116119future . success 1
117- # => <#Concurrent::Edge::CompletableFuture:0x7fa1b49bce10 success blocks:[]>
120+ # => <#Concurrent::Edge::CompletableFuture:0x7fcc0304fdb8 success blocks:[]>
118121future . success 1 rescue $!
119122 # => #<Concurrent::MultipleAssignmentError: multiple assignment>
120123future . try_success 2 # => false
121124event . complete
122- # => <#Concurrent::Edge::CompletableEvent:0x7fa1b49bc118 completed blocks:[]>
125+ # => <#Concurrent::Edge::CompletableEvent:0x7fcc0304e210 completed blocks:[]>
123126
124127[ t1 , t2 ] . each &:join
125128
126129
127130### Callbacks
128131
129- queue = Queue . new # => #<Thread::Queue:0x007fa1b49acec0 >
132+ queue = Queue . new # => #<Thread::Queue:0x007fcc03101720 >
130133future = Concurrent . delay { 1 + 1 }
131- # => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
134+ # => <#Concurrent::Edge::Future:0x7fcc030fbaf0 pending blocks:[]>
132135
133136future . on_success { queue << 1 } # evaluated asynchronously
134- # => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
137+ # => <#Concurrent::Edge::Future:0x7fcc030fbaf0 pending blocks:[]>
135138future . on_success! { queue << 2 } # evaluated on completing thread
136- # => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
139+ # => <#Concurrent::Edge::Future:0x7fcc030fbaf0 pending blocks:[]>
137140
138141queue . empty? # => true
139142future . value # => 2
144147### Thread-pools
145148
146149Concurrent . future ( :fast ) { 2 } . then ( :io ) { File . read __FILE__ } . wait
147- # => <#Concurrent::Edge::Future:0x7fa1b498dd18 success blocks:[]>
150+ # => <#Concurrent::Edge::Future:0x7fcc030e8bd0 success blocks:[]>
148151
149152
150153### Interoperability with actors
151154
152155actor = Concurrent ::Actor ::Utils ::AdHoc . spawn :square do
153156 -> v { v ** 2 }
154157end
155- # => #<Concurrent::Actor::Reference /square (Concurrent::Actor::Utils::AdHoc)>
158+ # => #<Concurrent::Actor::Reference:0x7fcc0223f020 /square (Concurrent::Actor::Utils::AdHoc)>
156159
157160Concurrent .
158161 future { 2 } .
165168
166169### Interoperability with channels
167170
168- ch1 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fa1b41c5e28 >
169- ch2 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fa1b41c5338 >
171+ ch1 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fcc021ec6b8 >
172+ ch2 = Concurrent ::Edge ::Channel . new # => #<Concurrent::Edge::Channel:0x007fcc021e7b18 >
170173
171174result = Concurrent . select ( ch1 , ch2 )
172- # => <#Concurrent::Edge::CompletableFuture:0x7fa1b41bf4d8 pending blocks:[]>
175+ # => <#Concurrent::Edge::CompletableFuture:0x7fcc021e6060 pending blocks:[]>
173176ch1 . push 1 # => nil
174177result . value!
175- # => [1, #<Concurrent::Edge::Channel:0x007fa1b41c5e28 >]
178+ # => [1, #<Concurrent::Edge::Channel:0x007fcc021ec6b8 >]
176179
177180Concurrent .
178181 future { 1 +1 } .
179182 then_push ( ch1 )
180- # => <#Concurrent::Edge::Future:0x7fa1b41b6450 pending blocks:[]>
183+ # => <#Concurrent::Edge::Future:0x7fcc021dc3d0 pending blocks:[]>
181184result = Concurrent .
182185 future { '%02d' } .
183186 then_select ( ch1 , ch2 ) .
184187 then { |format , ( value , channel ) | format format , value }
185- # => <#Concurrent::Edge::Future:0x7fa1b41a7f40 pending blocks:[]>
188+ # => <#Concurrent::Edge::Future:0x7fcc021cd9e8 pending blocks:[]>
186189result . value! # => "02"
187190
188191
189192### Common use-cases Examples
190193
191194# simple background processing
192195Concurrent . future { do_stuff }
193- # => <#Concurrent::Edge::Future:0x7fa1b4196d08 pending blocks:[]>
196+ # => <#Concurrent::Edge::Future:0x7fcc021b7530 pending blocks:[]>
194197
195198# parallel background processing
196199jobs = 10 . times . map { |i | Concurrent . future { i } }
@@ -207,7 +210,7 @@ def schedule_job
207210end # => :schedule_job
208211
209212schedule_job
210- # => <#Concurrent::Edge::Future:0x7fa1b4147960 pending blocks:[]>
213+ # => <#Concurrent::Edge::Future:0x7fcc0218faf8 pending blocks:[]>
211214@end = true # => true
212215
213216
@@ -220,7 +223,7 @@ def schedule_job
220223 data [ message ]
221224 end
222225end
223- # => #<Concurrent::Actor::Reference /db (Concurrent::Actor::Utils::AdHoc)>
226+ # => #<Concurrent::Actor::Reference:0x7fcc0214f458 /db (Concurrent::Actor::Utils::AdHoc)>
224227
225228concurrent_jobs = 11 . times . map do |v |
226229 Concurrent .
@@ -243,14 +246,14 @@ def schedule_job
243246
244247DB_POOL = Concurrent ::Actor ::Utils ::Pool . spawn! ( 'DB-pool' , pool_size ) do |index |
245248 # DB connection constructor
246- Concurrent ::Actor ::Utils ::AdHoc . spawn ( name : "worker-#{ index } " , args : [ data ] ) do
249+ Concurrent ::Actor ::Utils ::AdHoc . spawn ( name : "worker-#{ index } " , args : [ data ] ) do | data |
247250 lambda do |message |
248251 # pretending that this queries a DB
249252 data [ message ]
250253 end
251254 end
252255end
253- # => #<Concurrent::Actor::Reference /DB-pool (Concurrent::Actor::Utils::Pool)>
256+ # => #<Concurrent::Actor::Reference:0x7fcc02930398 /DB-pool (Concurrent::Actor::Utils::Pool)>
254257
255258concurrent_jobs = 11 . times . map do |v |
256259 Concurrent .
0 commit comments