@@ -66,26 +66,7 @@ def dataflow(*inputs, &block)
6666 module_function :dataflow
6767
6868 def dataflow_with ( executor , *inputs , &block )
69- raise ArgumentError . new ( 'an executor must be provided' ) if executor . nil?
70- raise ArgumentError . new ( 'no block given' ) unless block_given?
71- raise ArgumentError . new ( 'not all dependencies are IVars' ) unless inputs . all? { |input | input . is_a? IVar }
72-
73- result = Future . new ( executor : executor ) do
74- values = inputs . map { |input | input . value }
75- block . call ( *values )
76- end
77-
78- if inputs . empty?
79- result . execute
80- else
81- counter = DependencyCounter . new ( inputs . size ) { result . execute }
82-
83- inputs . each do |input |
84- input . add_observer counter
85- end
86- end
87-
88- result
69+ call_dataflow ( :value , executor , *inputs , &block )
8970 end
9071 module_function :dataflow_with
9172
@@ -95,12 +76,19 @@ def dataflow!(*inputs, &block)
9576 module_function :dataflow!
9677
9778 def dataflow_with! ( executor , *inputs , &block )
79+ call_dataflow ( :value! , executor , *inputs , &block )
80+ end
81+ module_function :dataflow_with!
82+
83+ private
84+
85+ def call_dataflow ( method , executor , *inputs , &block )
9886 raise ArgumentError . new ( 'an executor must be provided' ) if executor . nil?
9987 raise ArgumentError . new ( 'no block given' ) unless block_given?
10088 raise ArgumentError . new ( 'not all dependencies are IVars' ) unless inputs . all? { |input | input . is_a? IVar }
10189
10290 result = Future . new ( executor : executor ) do
103- values = inputs . map { |input | input . value! }
91+ values = inputs . map { |input | input . send ( method ) }
10492 block . call ( *values )
10593 end
10694
@@ -116,5 +104,5 @@ def dataflow_with!(executor, *inputs, &block)
116104
117105 result
118106 end
119- module_function :dataflow_with!
107+ module_function :call_dataflow
120108end
0 commit comments