@@ -5,43 +5,77 @@ module Concurrent
55 describe 'dataflow' do
66
77 let ( :executor ) { ImmediateExecutor . new }
8-
9- before ( :each ) do
10- Concurrent . configure do |config |
11- config . global_task_pool = Concurrent ::PerThreadExecutor . new
12- end
13- end
8+ let ( :root_executor ) { PerThreadExecutor . new }
149
1510 it 'raises an exception when no block given' do
1611 expect { Concurrent ::dataflow } . to raise_error ( ArgumentError )
12+ expect { Concurrent ::dataflow_with ( root_executor ) } . to raise_error ( ArgumentError )
13+ end
14+
15+ specify '#dataflow uses the global task pool' do
16+ input = Future . execute { 0 }
17+ Concurrent . should_receive ( :dataflow_with ) . once .
18+ with ( Concurrent . configuration . global_task_pool , input )
19+ Concurrent ::dataflow ( input ) { 0 }
20+ end
21+
22+ specify '#dataflow_with uses the given executor' do
23+ input = Future . execute { 0 }
24+ result = Future . new { 0 }
25+
26+ Future . should_receive ( :new ) . with ( executor : root_executor ) . and_return ( result )
27+ Concurrent ::dataflow_with ( root_executor , input ) { 0 }
28+ end
29+
30+ specify '#dataflow_with raises an exception when no executor given' do
31+ expect {
32+ Concurrent ::dataflow_with ( nil ) { nil }
33+ } . to raise_error ( ArgumentError )
1734 end
1835
1936 it 'accepts zero or more dependencies' do
2037 Concurrent ::dataflow ( ) { 0 }
2138 Concurrent ::dataflow ( Future . execute { 0 } ) { 0 }
2239 Concurrent ::dataflow ( Future . execute { 0 } , Future . execute { 0 } ) { 0 }
40+
41+ Concurrent ::dataflow_with ( root_executor , ) { 0 }
42+ Concurrent ::dataflow_with ( root_executor , Future . execute { 0 } ) { 0 }
43+ Concurrent ::dataflow_with ( root_executor , Future . execute { 0 } , Future . execute { 0 } ) { 0 }
2344 end
2445
2546 it 'accepts uncompleted dependencies' do
2647 d = Future . new ( executor : executor ) { 0 }
2748 Concurrent ::dataflow ( d ) { 0 }
2849 d . execute
50+
51+ d = Future . new ( executor : executor ) { 0 }
52+ Concurrent ::dataflow_with ( root_executor , d ) { 0 }
53+ d . execute
2954 end
3055
3156 it 'accepts completed dependencies' do
3257 d = Future . new ( executor : executor ) { 0 }
3358 d . execute
3459 Concurrent ::dataflow ( d ) { 0 }
60+
61+ d = Future . new ( executor : executor ) { 0 }
62+ d . execute
63+ Concurrent ::dataflow_with ( root_executor , d ) { 0 }
3564 end
3665
3766 it 'raises an exception if any dependencies are not IVars' do
3867 expect { Concurrent ::dataflow ( nil ) } . to raise_error ( ArgumentError )
3968 expect { Concurrent ::dataflow ( Future . execute { 0 } , nil ) } . to raise_error ( ArgumentError )
4069 expect { Concurrent ::dataflow ( nil , Future . execute { 0 } ) } . to raise_error ( ArgumentError )
70+
71+ expect { Concurrent ::dataflow_with ( root_executor , nil ) } . to raise_error ( ArgumentError )
72+ expect { Concurrent ::dataflow_with ( root_executor , Future . execute { 0 } , nil ) } . to raise_error ( ArgumentError )
73+ expect { Concurrent ::dataflow_with ( root_executor , nil , Future . execute { 0 } ) } . to raise_error ( ArgumentError )
4174 end
4275
4376 it 'returns a Future' do
4477 Concurrent ::dataflow { 0 } . should be_a ( Future )
78+ Concurrent ::dataflow { 0 } . should be_a ( Future )
4579 end
4680
4781 context 'does not schedule the Future' do
@@ -51,6 +85,11 @@ module Concurrent
5185 f = Concurrent ::dataflow ( d ) { 0 }
5286 f . should be_unscheduled
5387 d . execute
88+
89+ d = Future . new ( executor : executor ) { 0 }
90+ f = Concurrent ::dataflow_with ( root_executor , d ) { 0 }
91+ f . should be_unscheduled
92+ d . execute
5493 end
5594
5695 specify 'if one dependency of two is completed' do
@@ -60,6 +99,13 @@ module Concurrent
6099 d1 . execute
61100 f . should be_unscheduled
62101 d2 . execute
102+
103+ d1 = Future . new ( executor : executor ) { 0 }
104+ d2 = Future . new ( executor : executor ) { 0 }
105+ f = Concurrent ::dataflow_with ( root_executor , d1 , d2 ) { 0 }
106+ d1 . execute
107+ f . should be_unscheduled
108+ d2 . execute
63109 end
64110 end
65111
@@ -70,6 +116,11 @@ module Concurrent
70116 f = Concurrent ::dataflow ( d ) { 0 }
71117 d . execute
72118 f . value . should eq 0
119+
120+ d = Future . new ( executor : executor ) { 0 }
121+ f = Concurrent ::dataflow_with ( root_executor , d ) { 0 }
122+ d . execute
123+ f . value . should eq 0
73124 end
74125
75126 specify 'if there is more than one' do
@@ -79,6 +130,13 @@ module Concurrent
79130 d1 . execute
80131 d2 . execute
81132 f . value . should eq 0
133+
134+ d1 = Future . new ( executor : executor ) { 0 }
135+ d2 = Future . new ( executor : executor ) { 0 }
136+ f = Concurrent ::dataflow_with ( root_executor , d1 , d2 ) { 0 }
137+ d1 . execute
138+ d2 . execute
139+ f . value . should eq 0
82140 end
83141 end
84142
@@ -89,6 +147,11 @@ module Concurrent
89147 d . execute
90148 f = Concurrent ::dataflow ( d ) { 0 }
91149 f . value . should eq 0
150+
151+ d = Future . new ( executor : executor ) { 0 }
152+ d . execute
153+ f = Concurrent ::dataflow_with ( root_executor , d ) { 0 }
154+ f . value . should eq 0
92155 end
93156
94157 specify 'if there is more than one' do
@@ -98,26 +161,41 @@ module Concurrent
98161 d2 . execute
99162 f = Concurrent ::dataflow ( d1 , d2 ) { 0 }
100163 f . value . should eq 0
164+
165+ d1 = Future . new ( executor : executor ) { 0 }
166+ d2 = Future . new ( executor : executor ) { 0 }
167+ d1 . execute
168+ d2 . execute
169+ f = Concurrent ::dataflow_with ( root_executor , d1 , d2 ) { 0 }
170+ f . value . should eq 0
101171 end
102172 end
103173
104174 context 'passes the values of dependencies into the block' do
105175
106176 specify 'if there is just one' do
107177 d = Future . new ( executor : executor ) { 14 }
108- f = Concurrent ::dataflow ( d ) do |v |
109- v
110- end
178+ f = Concurrent ::dataflow ( d ) { |v | v }
179+ d . execute
180+ f . value . should eq 14
181+
182+ d = Future . new ( executor : executor ) { 14 }
183+ f = Concurrent ::dataflow_with ( root_executor , d ) { |v | v }
111184 d . execute
112185 f . value . should eq 14
113186 end
114187
115188 specify 'if there is more than one' do
116189 d1 = Future . new ( executor : executor ) { 14 }
117190 d2 = Future . new ( executor : executor ) { 2 }
118- f = Concurrent ::dataflow ( d1 , d2 ) do |v1 , v2 |
119- v1 + v2
120- end
191+ f = Concurrent ::dataflow ( d1 , d2 ) { |v1 , v2 | v1 + v2 }
192+ d1 . execute
193+ d2 . execute
194+ f . value . should eq 16
195+
196+ d1 = Future . new ( executor : executor ) { 14 }
197+ d2 = Future . new ( executor : executor ) { 2 }
198+ f = Concurrent ::dataflow_with ( root_executor , d1 , d2 ) { |v1 , v2 | v1 + v2 }
121199 d1 . execute
122200 d2 . execute
123201 f . value . should eq 16
@@ -126,15 +204,15 @@ module Concurrent
126204
127205 context 'module function' do
128206
129- it 'can be called as Concurrent.dataflow' do
207+ it 'can be called as Concurrent.dataflow and Concurrent.dataflow_with ' do
130208
131209 def fib_with_dot ( n )
132210 if n < 2
133211 Concurrent . dataflow { n }
134212 else
135213 n1 = fib_with_dot ( n - 1 )
136214 n2 = fib_with_dot ( n - 2 )
137- Concurrent . dataflow ( n1 , n2 ) { n1 . value + n2 . value }
215+ Concurrent . dataflow_with ( root_executor , n1 , n2 ) { n1 . value + n2 . value }
138216 end
139217 end
140218
@@ -143,15 +221,15 @@ def fib_with_dot(n)
143221 expected . value . should eq 377
144222 end
145223
146- it 'can be called as Concurrent::dataflow' do
224+ it 'can be called as Concurrent::dataflow and Concurrent::dataflow_with ' do
147225
148226 def fib_with_colons ( n )
149227 if n < 2
150228 Concurrent ::dataflow { n }
151229 else
152230 n1 = fib_with_colons ( n - 1 )
153231 n2 = fib_with_colons ( n - 2 )
154- Concurrent ::dataflow ( n1 , n2 ) { n1 . value + n2 . value }
232+ Concurrent ::dataflow_with ( root_executor , n1 , n2 ) { n1 . value + n2 . value }
155233 end
156234 end
157235
0 commit comments