@@ -672,8 +672,9 @@ def resolve_with(state, raise_on_reassign = true)
672672 # @!visibility private
673673 # @return [Array<AbstractPromise>]
674674 def blocks
675- @Callbacks . each_with_object ( [ ] ) do |callback , promises |
676- promises . push ( *( callback . select { |v | v . is_a? AbstractPromise } ) )
675+ # TODO (pitr-ch 18-Dec-2016): add macro noting that debug methods may change api without warning
676+ @Callbacks . each_with_object ( [ ] ) do |( method , *args ) , promises |
677+ promises . push ( args [ 0 ] ) if method == :callback_notify_blocked
677678 end
678679 end
679680
@@ -759,8 +760,8 @@ def async_callback_on_resolution(state, executor, args, callback)
759760 end
760761 end
761762
762- def callback_notify_blocked ( state , promise )
763- promise . on_resolution self
763+ def callback_notify_blocked ( state , promise , index )
764+ promise . on_resolution self , index
764765 end
765766 end
766767
@@ -985,7 +986,7 @@ def rescue_on(executor, *args, &task)
985986 # @return [Future]
986987 def zip ( other )
987988 if other . is_a? ( Future )
988- ZipFutureFuturePromise . new_blocked ( [ self , other ] , self , other , @DefaultExecutor ) . future
989+ ZipFuturesPromise . new_blocked ( [ self , other ] , [ self , other ] , @DefaultExecutor ) . future
989990 else
990991 ZipFutureEventPromise . new_blocked ( [ self , other ] , self , other , @DefaultExecutor ) . future
991992 end
@@ -1009,10 +1010,10 @@ def any(event_or_future)
10091010 #
10101011 # @return [Future]
10111012 def delay
1012- future = DelayPromise . new ( @DefaultExecutor ) . future
1013- ZipFutureEventPromise . new_blocked ( [ self , future ] ,
1013+ event = DelayPromise . new ( @DefaultExecutor ) . event
1014+ ZipFutureEventPromise . new_blocked ( [ self , event ] ,
10141015 self ,
1015- future ,
1016+ event ,
10161017 @DefaultExecutor ) . future
10171018 end
10181019
@@ -1373,7 +1374,7 @@ class BlockedPromise < InnerPromise
13731374 def self . new_blocked ( blockers , *args , &block )
13741375 promise = new ( *args , &block )
13751376 ensure
1376- blockers . each { |f | f . add_callback :callback_notify_blocked , promise }
1377+ blockers . each_with_index { |f , i | f . add_callback :callback_notify_blocked , promise , i }
13771378 end
13781379
13791380 def initialize ( future , blocked_by_futures , countdown )
@@ -1383,12 +1384,13 @@ def initialize(future, blocked_by_futures, countdown)
13831384 end
13841385
13851386 # @!visibility private
1386- def on_resolution ( future )
1387- countdown = process_on_resolution ( future )
1388- resolvable = resolvable? ( countdown , future )
1387+ def on_resolution ( future , index )
1388+ # TODO (pitr-ch 18-Dec-2016): rename to on_blocker_resolution
1389+ countdown = process_on_resolution ( future , index )
1390+ resolvable = resolvable? ( countdown , future , index )
13891391
13901392 if resolvable
1391- on_resolvable ( future )
1393+ on_resolvable ( future , index )
13921394 # futures could be deleted from blocked_by one by one here, but that would be too expensive,
13931395 # it's done once when all are resolved to free their references
13941396 clear_blocked_by!
@@ -1428,15 +1430,15 @@ def clear_blocked_by!
14281430 end
14291431
14301432 # @return [true,false] if resolvable
1431- def resolvable? ( countdown , future )
1433+ def resolvable? ( countdown , future , index )
14321434 countdown . zero?
14331435 end
14341436
1435- def process_on_resolution ( future )
1437+ def process_on_resolution ( future , index )
14361438 @Countdown . decrement
14371439 end
14381440
1439- def on_resolvable ( resolved_future )
1441+ def on_resolvable ( resolved_future , index )
14401442 raise NotImplementedError
14411443 end
14421444 end
@@ -1465,7 +1467,7 @@ def initialize(blocked_by_future, default_executor, executor, args, &task)
14651467 super blocked_by_future , default_executor , executor , args , &task
14661468 end
14671469
1468- def on_resolvable ( resolved_future )
1470+ def on_resolvable ( resolved_future , index )
14691471 if resolved_future . fulfilled?
14701472 Concurrent . executor ( @Executor ) . post ( resolved_future , @Args , @Task ) do |future , args , task |
14711473 evaluate_to lambda { future . apply args , task }
@@ -1483,7 +1485,7 @@ def initialize(blocked_by_future, default_executor, executor, args, &task)
14831485 super blocked_by_future , default_executor , executor , args , &task
14841486 end
14851487
1486- def on_resolvable ( resolved_future )
1488+ def on_resolvable ( resolved_future , index )
14871489 if resolved_future . rejected?
14881490 Concurrent . executor ( @Executor ) . post ( resolved_future , @Args , @Task ) do |future , args , task |
14891491 evaluate_to lambda { future . apply args , task }
@@ -1497,7 +1499,7 @@ def on_resolvable(resolved_future)
14971499 class ChainPromise < BlockedTaskPromise
14981500 private
14991501
1500- def on_resolvable ( resolved_future )
1502+ def on_resolvable ( resolved_future , index )
15011503 if Future === resolved_future
15021504 Concurrent . executor ( @Executor ) . post ( resolved_future , @Args , @Task ) do |future , args , task |
15031505 evaluate_to ( *future . result , *args , task )
@@ -1536,7 +1538,7 @@ def initialize_blocked_by(blocked_by_future)
15361538 @BlockedBy = LockFreeStack . new . push ( blocked_by_future )
15371539 end
15381540
1539- def on_resolvable ( resolved_future )
1541+ def on_resolvable ( resolved_future , index )
15401542 resolve_with resolved_future . internal_state
15411543 end
15421544
@@ -1550,8 +1552,8 @@ def blocked_by_add(future)
15501552 future . touch if self . future . touched?
15511553 end
15521554
1553- def resolvable? ( countdown , future )
1554- !@Future . internal_state . resolved? && super ( countdown , future )
1555+ def resolvable? ( countdown , future , index )
1556+ !@Future . internal_state . resolved? && super ( countdown , future , index )
15551557 end
15561558 end
15571559
@@ -1563,8 +1565,8 @@ def initialize(blocked_by_future, default_executor)
15631565 super Event . new ( self , default_executor ) , blocked_by_future , 2
15641566 end
15651567
1566- def process_on_resolution ( future )
1567- countdown = super ( future )
1568+ def process_on_resolution ( future , index )
1569+ countdown = super ( future , index )
15681570 if countdown . nonzero?
15691571 internal_state = future . internal_state
15701572
@@ -1577,8 +1579,8 @@ def process_on_resolution(future)
15771579 case value
15781580 when Future , Event
15791581 blocked_by_add value
1580- value . add_callback :callback_notify_blocked , self
1581- @Countdown . value
1582+ value . add_callback :callback_notify_blocked , self , nil
1583+ countdown
15821584 else
15831585 resolve_with RESOLVED
15841586 end
@@ -1597,8 +1599,8 @@ def initialize(blocked_by_future, levels, default_executor)
15971599 super Future . new ( self , default_executor ) , blocked_by_future , 1 + levels
15981600 end
15991601
1600- def process_on_resolution ( future )
1601- countdown = super ( future )
1602+ def process_on_resolution ( future , index )
1603+ countdown = super ( future , index )
16021604 if countdown . nonzero?
16031605 internal_state = future . internal_state
16041606
@@ -1611,8 +1613,8 @@ def process_on_resolution(future)
16111613 case value
16121614 when Future
16131615 blocked_by_add value
1614- value . add_callback :callback_notify_blocked , self
1615- @Countdown . value
1616+ value . add_callback :callback_notify_blocked , self , nil
1617+ countdown
16161618 when Event
16171619 evaluate_to ( lambda { raise TypeError , 'cannot flatten to Event' } )
16181620 else
@@ -1632,7 +1634,7 @@ def initialize(blocked_by_future, default_executor)
16321634 super Future . new ( self , default_executor ) , blocked_by_future , 1
16331635 end
16341636
1635- def process_on_resolution ( future )
1637+ def process_on_resolution ( future , index )
16361638 internal_state = future . internal_state
16371639
16381640 unless internal_state . fulfilled?
@@ -1645,7 +1647,7 @@ def process_on_resolution(future)
16451647 when Future
16461648 # FIXME (pitr-ch 08-Dec-2016): will accumulate the completed futures
16471649 blocked_by_add value
1648- value . add_callback :callback_notify_blocked , self
1650+ value . add_callback :callback_notify_blocked , self , nil
16491651 else
16501652 resolve_with internal_state
16511653 end
@@ -1661,43 +1663,28 @@ def initialize(event1, event2, default_executor)
16611663
16621664 private
16631665
1664- def on_resolvable ( resolved_future )
1666+ def on_resolvable ( resolved_future , index )
16651667 resolve_with RESOLVED
16661668 end
16671669 end
16681670
16691671 class ZipFutureEventPromise < BlockedPromise
16701672 def initialize ( future , event , default_executor )
16711673 super Future . new ( self , default_executor ) , [ future , event ] , 2
1672- @FutureResult = future
1674+ @result = nil
16731675 end
16741676
16751677 private
16761678
1677- def on_resolvable ( resolved_future )
1678- resolve_with @FutureResult . internal_state
1679- end
1680- end
1681-
1682- class ZipFutureFuturePromise < BlockedPromise
1683- def initialize ( future1 , future2 , default_executor )
1684- super Future . new ( self , default_executor ) , [ future1 , future2 ] , 2
1685- @Future1Result = future1
1686- @Future2Result = future2
1679+ def process_on_resolution ( future , index )
1680+ # first blocking is future, take its result
1681+ @result = future . internal_state if index == 0
1682+ # super has to be called after above to piggyback on volatile @Countdown
1683+ super future , index
16871684 end
16881685
1689- private
1690-
1691- def on_resolvable ( resolved_future )
1692- fulfilled1 , value1 , reason1 = @Future1Result . result
1693- fulfilled2 , value2 , reason2 = @Future2Result . result
1694- fulfilled = fulfilled1 && fulfilled2
1695- new_state = if fulfilled
1696- FulfilledArray . new ( [ value1 , value2 ] )
1697- else
1698- PartiallyRejected . new ( [ value1 , value2 ] , [ reason1 , reason2 ] )
1699- end
1700- resolve_with new_state
1686+ def on_resolvable ( resolved_future , index )
1687+ resolve_with @result
17011688 end
17021689 end
17031690
@@ -1708,7 +1695,7 @@ def initialize(event, default_executor)
17081695
17091696 private
17101697
1711- def on_resolvable ( resolved_future )
1698+ def on_resolvable ( resolved_future , index )
17121699 resolve_with RESOLVED
17131700 end
17141701 end
@@ -1720,7 +1707,7 @@ def initialize(future, default_executor)
17201707
17211708 private
17221709
1723- def on_resolvable ( resolved_future )
1710+ def on_resolvable ( resolved_future , index )
17241711 resolve_with resolved_future . internal_state
17251712 end
17261713 end
@@ -1730,23 +1717,28 @@ class ZipFuturesPromise < BlockedPromise
17301717 private
17311718
17321719 def initialize ( blocked_by_futures , default_executor )
1733- super ( Future . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
1720+ size = blocked_by_futures . size
1721+ super ( Future . new ( self , default_executor ) , blocked_by_futures , size )
1722+ @Resolutions = ::Array . new ( size )
1723+
1724+ on_resolvable nil , -1 if blocked_by_futures . empty?
1725+ end
17341726
1735- on_resolvable nil if blocked_by_futures . empty?
1727+ def process_on_resolution ( future , index )
1728+ countdown = super future , index
1729+ # TODO (pitr-ch 18-Dec-2016): Can we assume that array will never break under parallel access when never resized?
1730+ @Resolutions [ index ] = future . internal_state
1731+ countdown
17361732 end
17371733
1738- def on_resolvable ( resolved_future )
1734+ def on_resolvable ( resolved_future , index )
17391735 all_fulfilled = true
1740- values = Array . new ( blocked_by . size )
1741- reasons = Array . new ( blocked_by . size )
1736+ values = Array . new ( @Resolutions . size )
1737+ reasons = Array . new ( @Resolutions . size )
17421738
1743- blocked_by . each_with_index do |future , i |
1744- if future . is_a? ( Future )
1745- fulfilled , values [ i ] , reasons [ i ] = future . result
1746- all_fulfilled &&= fulfilled
1747- else
1748- values [ i ] = reasons [ i ] = nil
1749- end
1739+ @Resolutions . each_with_index do |internal_state , i |
1740+ fulfilled , values [ i ] , reasons [ i ] = internal_state . result
1741+ all_fulfilled &&= fulfilled
17501742 end
17511743
17521744 if all_fulfilled
@@ -1764,10 +1756,10 @@ class ZipEventsPromise < BlockedPromise
17641756 def initialize ( blocked_by_futures , default_executor )
17651757 super ( Event . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
17661758
1767- on_resolvable nil if blocked_by_futures . empty?
1759+ on_resolvable nil , - 1 if blocked_by_futures . empty?
17681760 end
17691761
1770- def on_resolvable ( resolved_future )
1762+ def on_resolvable ( resolved_future , index )
17711763 resolve_with RESOLVED
17721764 end
17731765 end
@@ -1784,11 +1776,11 @@ def initialize(blocked_by_futures, default_executor)
17841776 super ( Future . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
17851777 end
17861778
1787- def resolvable? ( countdown , future )
1779+ def resolvable? ( countdown , future , index )
17881780 true
17891781 end
17901782
1791- def on_resolvable ( resolved_future )
1783+ def on_resolvable ( resolved_future , index )
17921784 resolve_with resolved_future . internal_state , false
17931785 end
17941786 end
@@ -1801,11 +1793,11 @@ def initialize(blocked_by_futures, default_executor)
18011793 super ( Event . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
18021794 end
18031795
1804- def resolvable? ( countdown , future )
1796+ def resolvable? ( countdown , future , index )
18051797 true
18061798 end
18071799
1808- def on_resolvable ( resolved_future )
1800+ def on_resolvable ( resolved_future , index )
18091801 resolve_with RESOLVED , false
18101802 end
18111803 end
@@ -1814,7 +1806,7 @@ class AnyFulfilledFuturePromise < AnyResolvedFuturePromise
18141806
18151807 private
18161808
1817- def resolvable? ( countdown , future )
1809+ def resolvable? ( countdown , future , index )
18181810 future . fulfilled? ||
18191811 # inlined super from BlockedPromise
18201812 countdown . zero?
@@ -1887,7 +1879,6 @@ def initialize(default_executor, intended_time)
18871879 :RunFuturePromise ,
18881880 :ZipEventEventPromise ,
18891881 :ZipFutureEventPromise ,
1890- :ZipFutureFuturePromise ,
18911882 :EventWrapperPromise ,
18921883 :FutureWrapperPromise ,
18931884 :ZipFuturesPromise ,
0 commit comments