|
1 | 1 | ### Simple asynchronous task |
2 | 2 |
|
3 | 3 | future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately |
4 | | - # => <#Concurrent::Edge::Future:0x7fa08385da60 pending blocks:[]> |
| 4 | + # => <#Concurrent::Edge::Future:0x7fad9ca186e0 pending blocks:[]> |
5 | 5 | future.completed? # => false |
6 | 6 | # block until evaluated |
7 | 7 | future.value # => 2 |
|
11 | 11 | ### Failing asynchronous task |
12 | 12 |
|
13 | 13 | future = Concurrent.future { raise 'Boom' } |
14 | | - # => <#Concurrent::Edge::Future:0x7fa083834638 failed blocks:[]> |
| 14 | + # => <#Concurrent::Edge::Future:0x7fad9c9f95b0 pending blocks:[]> |
15 | 15 | future.value # => nil |
16 | 16 | future.value! rescue $! # => #<RuntimeError: Boom> |
17 | 17 | future.reason # => #<RuntimeError: Boom> |
|
26 | 26 | branch2 = head.then(&:succ).then(&:succ) |
27 | 27 | branch1.zip(branch2).value # => [2, 3] |
28 | 28 | (branch1 & branch2).then { |(a, b)| a + b }.value |
29 | | - # => 5 |
| 29 | + # => nil |
30 | 30 | # pick only first completed |
31 | 31 | (branch1 | branch2).value # => 2 |
32 | 32 |
|
|
44 | 44 | ### Delay |
45 | 45 |
|
46 | 46 | # will not evaluate until asked by #value or other method requiring completion |
47 | | -scheduledfuture = Concurrent.delay { 'lazy' } |
48 | | - # => <#Concurrent::Edge::Future:0x7fa0831917b8 pending blocks:[]> |
| 47 | +future = Concurrent.delay { 'lazy' } |
| 48 | + # => <#Concurrent::Edge::Future:0x7fad9c8fb3e8 pending blocks:[]> |
49 | 49 | sleep 0.1 |
50 | | -future.completed? # => true |
51 | | -future.value # => nil |
| 50 | +future.completed? # => false |
| 51 | +future.value # => "lazy" |
52 | 52 |
|
53 | 53 | # propagates trough chain allowing whole or partial lazy chains |
54 | 54 |
|
55 | 55 | head = Concurrent.delay { 1 } |
56 | | - # => <#Concurrent::Edge::Future:0x7fa083172ef8 pending blocks:[]> |
| 56 | + # => <#Concurrent::Edge::Future:0x7fad9b158bf0 pending blocks:[]> |
57 | 57 | branch1 = head.then(&:succ) |
58 | | - # => <#Concurrent::Edge::Future:0x7fa083171c88 pending blocks:[]> |
| 58 | + # => <#Concurrent::Edge::Future:0x7fad9b149ba0 pending blocks:[]> |
59 | 59 | branch2 = head.delay.then(&:succ) |
60 | | - # => <#Concurrent::Edge::Future:0x7fa08294f528 pending blocks:[]> |
| 60 | + # => <#Concurrent::Edge::Future:0x7fad9b12a020 pending blocks:[]> |
61 | 61 | join = branch1 & branch2 |
62 | | - # => <#Concurrent::Edge::Future:0x7fa08294e218 pending blocks:[]> |
| 62 | + # => <#Concurrent::Edge::ArrayFuture:0x7fad9b8a0778 pending blocks:[]> |
63 | 63 |
|
64 | 64 | sleep 0.1 # nothing will complete # => 0 |
65 | 65 | [head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false] |
|
87 | 87 | ### Schedule |
88 | 88 |
|
89 | 89 | scheduled = Concurrent.schedule(0.1) { 1 } |
90 | | - # => <#Concurrent::Edge::Future:0x7fa08224edf0 pending blocks:[]> |
| 90 | + # => <#Concurrent::Edge::Future:0x7fad9a941e08 pending blocks:[]> |
91 | 91 |
|
92 | 92 | scheduled.completed? # => false |
93 | 93 | scheduled.value # available after 0.1sec # => 1 |
94 | 94 |
|
95 | 95 | # and in chain |
96 | 96 | scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ) |
97 | | - # => <#Concurrent::Edge::Future:0x7fa0831f3d50 pending blocks:[]> |
| 97 | + # => <#Concurrent::Edge::Future:0x7fad9b0aa7d0 pending blocks:[]> |
98 | 98 | # will not be scheduled until value is requested |
99 | 99 | sleep 0.1 |
100 | 100 | scheduled.value # returns after another 0.1sec # => 2 |
|
103 | 103 | ### Completable Future and Event |
104 | 104 |
|
105 | 105 | future = Concurrent.future |
106 | | - # => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 pending blocks:[]> |
| 106 | + # => <#Concurrent::Edge::CompletableFuture:0x7fad9a87b6e0 pending blocks:[]> |
107 | 107 | event = Concurrent.event |
108 | | - # => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 pending blocks:[]> |
| 108 | + # => <#Concurrent::Edge::CompletableEvent:0x7fad9a86ba88 pending blocks:[]> |
109 | 109 |
|
110 | 110 | # will be blocked until completed |
111 | 111 | t1 = Thread.new { future.value } |
112 | 112 | t2 = Thread.new { event.wait } |
113 | 113 |
|
114 | 114 | future.success 1 |
115 | | - # => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 success blocks:[]> |
| 115 | + # => <#Concurrent::Edge::CompletableFuture:0x7fad9a87b6e0 success blocks:[]> |
116 | 116 | future.success 1 rescue $! |
117 | 117 | # => #<Concurrent::MultipleAssignmentError: multiple assignment> |
118 | 118 | future.try_success 2 # => false |
119 | 119 | event.complete |
120 | | - # => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 completed blocks:[]> |
| 120 | + # => <#Concurrent::Edge::CompletableEvent:0x7fad9a86ba88 completed blocks:[]> |
121 | 121 |
|
122 | 122 | [t1, t2].each &:join |
123 | 123 |
|
124 | 124 |
|
125 | 125 | ### Callbacks |
126 | 126 |
|
127 | | -queue = Queue.new # => #<Thread::Queue:0x007fa0831bac30> |
| 127 | +queue = Queue.new # => #<Thread::Queue:0x007fad9a862320> |
128 | 128 | future = Concurrent.delay { 1 + 1 } |
129 | | - # => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]> |
| 129 | + # => <#Concurrent::Edge::Future:0x7fad9a853960 pending blocks:[]> |
130 | 130 |
|
131 | 131 | future.on_success { queue << 1 } # evaluated asynchronously |
132 | | - # => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]> |
| 132 | + # => <#Concurrent::Edge::Future:0x7fad9a853960 pending blocks:[]> |
133 | 133 | future.on_success! { queue << 2 } # evaluated on completing thread |
134 | | - # => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]> |
| 134 | + # => <#Concurrent::Edge::Future:0x7fad9a853960 pending blocks:[]> |
135 | 135 |
|
136 | 136 | queue.empty? # => true |
137 | 137 | future.value # => 2 |
|
142 | 142 | ### Thread-pools |
143 | 143 |
|
144 | 144 | Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait |
145 | | - # => <#Concurrent::Edge::Future:0x7fa08318b070 success blocks:[]> |
| 145 | + # => <#Concurrent::Edge::Future:0x7fad9a883958 success blocks:[]> |
146 | 146 |
|
147 | 147 |
|
148 | 148 | ### Interoperability with actors |
|
161 | 161 | actor.ask(2).then(&:succ).value # => 5 |
162 | 162 |
|
163 | 163 |
|
| 164 | +### Interoperability with channels |
| 165 | + |
| 166 | +ch1 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fad9c892ac8> |
| 167 | +ch2 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fad9c8904a8> |
| 168 | + |
| 169 | +result = Concurrent.select(ch1, ch2) |
| 170 | + # => <#Concurrent::Edge::CompletableFuture:0x7fad9b86aa88 pending blocks:[]> |
| 171 | +ch1.push 1 # => nil |
| 172 | +result.value! |
| 173 | + # => [1, #<Concurrent::Edge::Channel:0x007fad9c892ac8>] |
| 174 | + |
| 175 | +Concurrent. |
| 176 | + future { 1+1 }. |
| 177 | + then_push(ch1) |
| 178 | + # => <#Concurrent::Edge::Future:0x7fad9c898d88 pending blocks:[]> |
| 179 | +result = Concurrent. |
| 180 | + future { '%02d' }. |
| 181 | + then_select(ch1, ch2). |
| 182 | + then { |format, (value, channel)| format format, value } |
| 183 | + # => <#Concurrent::Edge::Future:0x7fad9b88b4e0 pending blocks:[]> |
| 184 | +result.value! # => "02" |
| 185 | + |
| 186 | + |
164 | 187 | ### Common use-cases Examples |
165 | 188 |
|
166 | 189 | # simple background processing |
167 | 190 | Concurrent.future { do_stuff } |
168 | | - # => <#Concurrent::Edge::Future:0x7fa0839ee8e8 pending blocks:[]> |
| 191 | + # => <#Concurrent::Edge::Future:0x7fad9b151b98 pending blocks:[]> |
169 | 192 |
|
170 | 193 | # parallel background processing |
171 | 194 | jobs = 10.times.map { |i| Concurrent.future { i } } |
172 | 195 | Concurrent.zip(*jobs).value # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] |
173 | 196 |
|
174 | 197 |
|
175 | 198 | # periodic task |
| 199 | +@end = false # => false |
| 200 | + |
176 | 201 | def schedule_job |
177 | 202 | Concurrent.schedule(1) { do_stuff }. |
178 | 203 | rescue { |e| report_error e }. |
179 | | - then { schedule_job } |
| 204 | + then { schedule_job unless @end } |
180 | 205 | end # => :schedule_job |
181 | 206 |
|
182 | 207 | schedule_job |
183 | | - # => <#Concurrent::Edge::Future:0x7fa082904f78 pending blocks:[]> |
184 | | - |
185 | | - |
| 208 | + # => <#Concurrent::Edge::Future:0x7fad9c96a6a8 pending blocks:[]> |
| 209 | +@end = true # => true |
| 210 | + |
| 211 | + |
| 212 | +# How to limit processing where there are limited resources? |
| 213 | +# By creating an actor managing the resource |
| 214 | +DB = Concurrent::Actor::Utils::AdHoc.spawn :db do |
| 215 | + data = Array.new(10) { |i| '*' * i } |
| 216 | + lambda do |message| |
| 217 | + # pretending that this queries a DB |
| 218 | + data[message] |
| 219 | + end |
| 220 | +end |
| 221 | + # => #<Concurrent::Actor::Reference /db (Concurrent::Actor::Utils::AdHoc)> |
| 222 | + |
| 223 | +concurrent_jobs = 11.times.map do |v| |
| 224 | + Concurrent. |
| 225 | + future { v }. |
| 226 | + # ask the DB with the `v`, only one at the time, rest is parallel |
| 227 | + then_ask(DB). |
| 228 | + # get size of the string, fails for 11 |
| 229 | + then(&:size). |
| 230 | + rescue { |reason| reason.message } # translate error to value (exception, message) |
| 231 | +end |
| 232 | + |
| 233 | +Concurrent.zip(*concurrent_jobs).value! |
| 234 | + # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, "undefined method `size' for nil:NilClass"] |
| 235 | + |
| 236 | + |
| 237 | +# In reality there is often a pool though: |
| 238 | +class DBConnection < Concurrent::Actor::Utils::AbstractWorker |
| 239 | + def initialize(balancer, data) |
| 240 | + super balancer |
| 241 | + @data = data |
| 242 | + end |
| 243 | + |
| 244 | + def work(message) |
| 245 | + # pretending that this queries a DB |
| 246 | + @data[message] |
| 247 | + end |
| 248 | +end # => :work |
| 249 | + |
| 250 | +data = Array.new(10) { |i| '*' * i } |
| 251 | + # => ["", "*", "**", "***", "****", "*****", "******", "*******", "********", "*********"] |
| 252 | +pool_size = 5 # => 5 |
| 253 | + |
| 254 | +DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |balancer, index| |
| 255 | + DBConnection.spawn(name: "worker-#{index}", args: [balancer, data]) |
| 256 | +end |
| 257 | + # => #<Concurrent::Actor::Reference /DB-pool (Concurrent::Actor::Utils::Pool)> |
| 258 | + |
| 259 | +concurrent_jobs = 11.times.map do |v| |
| 260 | + Concurrent. |
| 261 | + future { v }. |
| 262 | + # ask the DB_POOL with the `v`, only 5 at the time, rest is parallel |
| 263 | + then_ask(DB_POOL). |
| 264 | + then(&:size). |
| 265 | + rescue { |reason| reason.message } |
| 266 | +end |
| 267 | + |
| 268 | +Concurrent.zip(*concurrent_jobs).value! |
| 269 | + # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, "undefined method `size' for nil:NilClass"] |
0 commit comments