@@ -76,12 +76,23 @@ def schedule(intended_time, default_executor = :io, &task)
7676 ScheduledPromise . new ( default_executor , intended_time ) . future . then ( &task )
7777 end
7878
79- # Constructs new {Future} which is completed after all futures are complete. Its value is array
80- # of dependent future values. If there is an error it fails with the first one.
81- # @param [Event] futures
79+ # Constructs new {Future} which is completed after all futures_and_or_events are complete. Its value is array
80+ # of dependent future values. If there is an error it fails with the first one. Event does not
81+ # have a value so it's represented by nil in the array of values.
82+ # @param [Event] futures_and_or_events
8283 # @return [Future]
83- def zip ( *futures )
84- ZipPromise . new ( futures , :io ) . future
84+ def zip_futures ( *futures_and_or_events )
85+ ZipFuturesPromise . new ( futures_and_or_events , :io ) . future
86+ end
87+
88+ alias_method :zip , :zip_futures
89+
90+ # Constructs new {Event} which is completed after all futures_and_or_events are complete
91+ # (Future is completed when Success or Failed).
92+ # @param [Event] futures_and_or_events
93+ # @return [Event]
94+ def zip_events ( *futures_and_or_events )
95+ ZipEventsPromise . new ( futures_and_or_events , :io ) . future
8596 end
8697
8798 # Constructs new {Future} which is completed after first of the futures is complete.
@@ -95,6 +106,7 @@ def any(*futures)
95106 # @return [Future]
96107 def select ( *channels )
97108 future do
109+ # noinspection RubyArgCount
98110 Channel . select do |s |
99111 channels . each do |ch |
100112 s . take ( ch ) { |value | [ value , ch ] }
@@ -503,9 +515,9 @@ def apply(block)
503515 # @!visibility private
504516 class PartiallyFailed < CompletedWithResult
505517 def initialize ( value , reason )
518+ super ( )
506519 @Value = value
507520 @Reason = reason
508- super ( )
509521 end
510522
511523 def success?
@@ -670,7 +682,7 @@ def schedule(intended_time)
670682 # Zips with selected value form the suplied channels
671683 # @return [Future]
672684 def then_select ( *channels )
673- ZipPromise . new ( [ self , Concurrent . select ( *channels ) ] , @DefaultExecutor ) . future
685+ ZipFuturesPromise . new ( [ self , Concurrent . select ( *channels ) ] , @DefaultExecutor ) . future
674686 end
675687
676688 # Changes default executor for rest of the chain
@@ -987,12 +999,16 @@ class InnerPromise < AbstractPromise
987999 # @abstract
9881000 # @!visibility private
9891001 class BlockedPromise < InnerPromise
1002+ def self . new ( *args , &block )
1003+ promise = super ( *args , &block )
1004+ promise . blocked_by . each { |f | f . add_callback :pr_callback_notify_blocked , promise }
1005+ promise
1006+ end
1007+
9901008 def initialize ( future , blocked_by_futures , countdown )
1009+ super ( future )
9911010 initialize_blocked_by ( blocked_by_futures )
9921011 @Countdown = AtomicFixnum . new countdown
993-
994- super ( future )
995- @BlockedBy . each { |f | f . add_callback :pr_callback_notify_blocked , self }
9961012 end
9971013
9981014 # @api private
@@ -1053,9 +1069,9 @@ def on_completable(done_future)
10531069 class BlockedTaskPromise < BlockedPromise
10541070 def initialize ( blocked_by_future , default_executor , executor , &task )
10551071 raise ArgumentError , 'no block given' unless block_given?
1072+ super Future . new ( self , default_executor ) , blocked_by_future , 1
10561073 @Executor = executor
10571074 @Task = task
1058- super Future . new ( self , default_executor ) , blocked_by_future , 1
10591075 end
10601076
10611077 def executor
@@ -1203,8 +1219,8 @@ def on_completable(done_future)
12031219 # @!visibility private
12041220 class ZipFutureEventPromise < BlockedPromise
12051221 def initialize ( future , event , default_executor )
1206- @FutureResult = future
12071222 super Future . new ( self , default_executor ) , [ future , event ] , 2
1223+ @FutureResult = future
12081224 end
12091225
12101226 def on_completable ( done_future )
@@ -1215,9 +1231,9 @@ def on_completable(done_future)
12151231 # @!visibility private
12161232 class ZipFutureFuturePromise < BlockedPromise
12171233 def initialize ( future1 , future2 , default_executor )
1234+ super Future . new ( self , default_executor ) , [ future1 , future2 ] , 2
12181235 @Future1Result = future1
12191236 @Future2Result = future2
1220- super Future . new ( self , default_executor ) , [ future1 , future2 ] , 2
12211237 end
12221238
12231239 def on_completable ( done_future )
@@ -1256,62 +1272,54 @@ def on_completable(done_future)
12561272 end
12571273
12581274 # @!visibility private
1259- class ZipPromise < BlockedPromise
1275+ class ZipFuturesPromise < BlockedPromise
12601276
12611277 private
12621278
12631279 def initialize ( blocked_by_futures , default_executor )
1264- klass = Event
1265- blocked_by_futures . each do |f |
1266- if f . is_a? ( Future )
1267- if klass == Event
1268- klass = Future
1269- break
1270- end
1271- end
1272- end
1273-
1274- # noinspection RubyArgCount
1275- super ( klass . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
1280+ super ( Future . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
12761281
1277- if blocked_by_futures . empty?
1278- on_completable nil
1279- end
1282+ on_completable nil if blocked_by_futures . empty?
12801283 end
12811284
12821285 def on_completable ( done_future )
12831286 all_success = true
1284- values = [ ]
1285- reasons = [ ]
1286-
1287- blocked_by . each do |future |
1288- next unless future . is_a? ( Future )
1289- success , value , reason = future . result
1287+ values = Array . new ( blocked_by . size )
1288+ reasons = Array . new ( blocked_by . size )
12901289
1291- unless success
1292- all_success = false
1290+ blocked_by . each_with_index do |future , i |
1291+ if future . is_a? ( Future )
1292+ success , values [ i ] , reasons [ i ] = future . result
1293+ all_success &&= success
1294+ else
1295+ values [ i ] = reasons [ i ] = nil
12931296 end
1294-
1295- values << value
1296- reasons << reason
12971297 end
12981298
12991299 if all_success
1300- if values . empty?
1301- complete_with Event ::COMPLETED
1302- else
1303- if values . size == 1
1304- complete_with Future ::Success . new ( values . first )
1305- else
1306- complete_with Future ::SuccessArray . new ( values )
1307- end
1308- end
1300+ complete_with Future ::SuccessArray . new ( values )
13091301 else
13101302 complete_with Future ::PartiallyFailed . new ( values , reasons )
13111303 end
13121304 end
13131305 end
13141306
1307+ # @!visibility private
1308+ class ZipEventsPromise < BlockedPromise
1309+
1310+ private
1311+
1312+ def initialize ( blocked_by_futures , default_executor )
1313+ super ( Event . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
1314+
1315+ on_completable nil if blocked_by_futures . empty?
1316+ end
1317+
1318+ def on_completable ( done_future )
1319+ complete_with Event ::COMPLETED
1320+ end
1321+ end
1322+
13151323 # @!visibility private
13161324 class AnyPromise < BlockedPromise
13171325
@@ -1354,8 +1362,8 @@ def touch
13541362 private
13551363
13561364 def initialize ( default_executor , value )
1357- @Value = value
13581365 super Future . new ( self , default_executor )
1366+ @Value = value
13591367 end
13601368 end
13611369
@@ -1373,6 +1381,8 @@ def inspect
13731381 private
13741382
13751383 def initialize ( default_executor , intended_time )
1384+ super Event . new ( self , default_executor )
1385+
13761386 @IntendedTime = intended_time
13771387
13781388 in_seconds = begin
@@ -1385,8 +1395,6 @@ def initialize(default_executor, intended_time)
13851395 [ 0 , schedule_time . to_f - now . to_f ] . max
13861396 end
13871397
1388- super Event . new ( self , default_executor )
1389-
13901398 Concurrent . global_timer_set . post ( in_seconds ) do
13911399 @Future . complete_with Event ::COMPLETED
13921400 end
0 commit comments