@@ -577,7 +577,7 @@ def chain(*args, &task)
577577 # @overload a_future.chain_on(executor, *args, &task)
578578 # @yield [fulfilled?, value, reason, *args] to the task.
579579 def chain_on ( executor , *args , &task )
580- ChainPromise . new_blocked ( [ self ] , @DefaultExecutor , executor , args , &task ) . future
580+ ChainPromise . new_blocked1 ( self , @DefaultExecutor , executor , args , &task ) . future
581581 end
582582
583583 # Short string representation.
@@ -664,7 +664,7 @@ def resolve_with(state, raise_on_reassign = true)
664664 # @return [Array<AbstractPromise>]
665665 def blocks
666666 # TODO (pitr-ch 18-Dec-2016): add macro noting that debug methods may change api without warning
667- @Callbacks . each_with_object ( [ ] ) do |( method , * args ) , promises |
667+ @Callbacks . each_with_object ( [ ] ) do |( method , args ) , promises |
668668 promises . push ( args [ 0 ] ) if method == :callback_notify_blocked
669669 end
670670 end
@@ -697,9 +697,9 @@ def waiting_threads
697697 def add_callback ( method , *args )
698698 state = internal_state
699699 if resolved? ( state )
700- call_callback method , state , * args
700+ call_callback method , state , args
701701 else
702- @Callbacks . push [ method , * args ]
702+ @Callbacks . push [ method , args ]
703703 state = internal_state
704704 # take back if it was resolved in the meanwhile
705705 call_callbacks state if resolved? ( state )
@@ -729,15 +729,15 @@ def wait_until_resolved(timeout)
729729 resolved?
730730 end
731731
732- def call_callback ( method , state , * args )
732+ def call_callback ( method , state , args )
733733 self . send method , state , *args
734734 end
735735
736736 def call_callbacks ( state )
737- method , * args = @Callbacks . pop
737+ method , args = @Callbacks . pop
738738 while method
739- call_callback method , state , * args
740- method , * args = @Callbacks . pop
739+ call_callback method , state , args
740+ method , args = @Callbacks . pop
741741 end
742742 end
743743
@@ -774,9 +774,9 @@ class Event < AbstractEventFuture
774774 # @return [Future, Event]
775775 def zip ( other )
776776 if other . is_a? ( Future )
777- ZipFutureEventPromise . new_blocked ( [ other , self ] , @DefaultExecutor ) . future
777+ ZipFutureEventPromise . new_blocked2 ( other , self , @DefaultExecutor ) . future
778778 else
779- ZipEventEventPromise . new_blocked ( [ self , other ] , @DefaultExecutor ) . event
779+ ZipEventEventPromise . new_blocked2 ( self , other , @DefaultExecutor ) . event
780780 end
781781 end
782782
@@ -787,7 +787,7 @@ def zip(other)
787787 #
788788 # @return [Event]
789789 def any ( event_or_future )
790- AnyResolvedEventPromise . new_blocked ( [ self , event_or_future ] , @DefaultExecutor ) . event
790+ AnyResolvedEventPromise . new_blocked2 ( self , event_or_future , @DefaultExecutor ) . event
791791 end
792792
793793 alias_method :| , :any
@@ -798,7 +798,7 @@ def any(event_or_future)
798798 # @return [Event]
799799 def delay
800800 event = DelayPromise . new ( @DefaultExecutor ) . event
801- ZipEventEventPromise . new_blocked ( [ self , event ] , @DefaultExecutor ) . event
801+ ZipEventEventPromise . new_blocked2 ( self , event , @DefaultExecutor ) . event
802802 end
803803
804804 # @!macro [new] promise.method.schedule
@@ -811,7 +811,7 @@ def delay
811811 def schedule ( intended_time )
812812 chain do
813813 event = ScheduledPromise . new ( @DefaultExecutor , intended_time ) . event
814- ZipEventEventPromise . new_blocked ( [ self , event ] , @DefaultExecutor ) . event
814+ ZipEventEventPromise . new_blocked2 ( self , event , @DefaultExecutor ) . event
815815 end . flat_event
816816 end
817817
@@ -833,7 +833,7 @@ def to_event
833833 # @!macro promises.method.with_default_executor
834834 # @return [Event]
835835 def with_default_executor ( executor )
836- EventWrapperPromise . new_blocked ( [ self ] , executor ) . event
836+ EventWrapperPromise . new_blocked1 ( self , executor ) . event
837837 end
838838
839839 private
@@ -946,7 +946,7 @@ def then(*args, &task)
946946 # @return [Future]
947947 # @yield [value, *args] to the task.
948948 def then_on ( executor , *args , &task )
949- ThenPromise . new_blocked ( [ self ] , @DefaultExecutor , executor , args , &task ) . future
949+ ThenPromise . new_blocked1 ( self , @DefaultExecutor , executor , args , &task ) . future
950950 end
951951
952952 # @!macro promises.shortcut.on
@@ -964,16 +964,16 @@ def rescue(*args, &task)
964964 # @return [Future]
965965 # @yield [reason, *args] to the task.
966966 def rescue_on ( executor , *args , &task )
967- RescuePromise . new_blocked ( [ self ] , @DefaultExecutor , executor , args , &task ) . future
967+ RescuePromise . new_blocked1 ( self , @DefaultExecutor , executor , args , &task ) . future
968968 end
969969
970970 # @!macro promises.method.zip
971971 # @return [Future]
972972 def zip ( other )
973973 if other . is_a? ( Future )
974- ZipFuturesPromise . new_blocked ( [ self , other ] , @DefaultExecutor ) . future
974+ ZipFuturesPromise . new_blocked2 ( self , other , @DefaultExecutor ) . future
975975 else
976- ZipFutureEventPromise . new_blocked ( [ self , other ] , @DefaultExecutor ) . future
976+ ZipFutureEventPromise . new_blocked2 ( self , other , @DefaultExecutor ) . future
977977 end
978978 end
979979
@@ -985,7 +985,7 @@ def zip(other)
985985 #
986986 # @return [Future]
987987 def any ( event_or_future )
988- AnyResolvedFuturePromise . new_blocked ( [ self , event_or_future ] , @DefaultExecutor ) . future
988+ AnyResolvedFuturePromise . new_blocked2 ( self , event_or_future , @DefaultExecutor ) . future
989989 end
990990
991991 alias_method :| , :any
@@ -996,22 +996,22 @@ def any(event_or_future)
996996 # @return [Future]
997997 def delay
998998 event = DelayPromise . new ( @DefaultExecutor ) . event
999- ZipFutureEventPromise . new_blocked ( [ self , event ] , @DefaultExecutor ) . future
999+ ZipFutureEventPromise . new_blocked2 ( self , event , @DefaultExecutor ) . future
10001000 end
10011001
10021002 # @!macro promise.method.schedule
10031003 # @return [Future]
10041004 def schedule ( intended_time )
10051005 chain do
10061006 event = ScheduledPromise . new ( @DefaultExecutor , intended_time ) . event
1007- ZipFutureEventPromise . new_blocked ( [ self , event ] , @DefaultExecutor ) . future
1007+ ZipFutureEventPromise . new_blocked2 ( self , event , @DefaultExecutor ) . future
10081008 end . flat
10091009 end
10101010
10111011 # @!macro promises.method.with_default_executor
10121012 # @return [Future]
10131013 def with_default_executor ( executor )
1014- FutureWrapperPromise . new_blocked ( [ self ] , executor ) . future
1014+ FutureWrapperPromise . new_blocked1 ( self , executor ) . future
10151015 end
10161016
10171017 # Creates new future which will have result of the future returned by receiver. If receiver
@@ -1020,7 +1020,7 @@ def with_default_executor(executor)
10201020 # @param [Integer] level how many levels of futures should flatten
10211021 # @return [Future]
10221022 def flat_future ( level = 1 )
1023- FlatFuturePromise . new_blocked ( [ self ] , level , @DefaultExecutor ) . future
1023+ FlatFuturePromise . new_blocked1 ( self , level , @DefaultExecutor ) . future
10241024 end
10251025
10261026 alias_method :flat , :flat_future
@@ -1030,7 +1030,7 @@ def flat_future(level = 1)
10301030 #
10311031 # @return [Event]
10321032 def flat_event
1033- FlatEventPromise . new_blocked ( [ self ] , @DefaultExecutor ) . event
1033+ FlatEventPromise . new_blocked1 ( self , @DefaultExecutor ) . event
10341034 end
10351035
10361036 # @!macro promises.shortcut.using
@@ -1104,7 +1104,7 @@ def on_rejection_using(executor, *args, &callback)
11041104 # end
11051105 # future(0, &body).run.value! # => 5
11061106 def run
1107- RunFuturePromise . new_blocked ( [ self ] , @DefaultExecutor ) . future
1107+ RunFuturePromise . new_blocked1 ( self , @DefaultExecutor ) . future
11081108 end
11091109
11101110 # @!visibility private
@@ -1199,7 +1199,7 @@ def resolve(raise_on_reassign = true)
11991199 #
12001200 # @return [Event]
12011201 def with_hidden_resolvable
1202- @with_hidden_resolvable ||= EventWrapperPromise . new_blocked ( [ self ] , @DefaultExecutor ) . event
1202+ @with_hidden_resolvable ||= EventWrapperPromise . new_blocked1 ( self , @DefaultExecutor ) . event
12031203 end
12041204 end
12051205
@@ -1255,7 +1255,7 @@ def evaluate_to!(*args, &block)
12551255 #
12561256 # @return [Future]
12571257 def with_hidden_resolvable
1258- @with_hidden_resolvable ||= FutureWrapperPromise . new_blocked ( [ self ] , @DefaultExecutor ) . future
1258+ @with_hidden_resolvable ||= FutureWrapperPromise . new_blocked1 ( self , @DefaultExecutor ) . future
12591259 end
12601260 end
12611261
@@ -1354,16 +1354,49 @@ class BlockedPromise < InnerPromise
13541354
13551355 private_class_method :new
13561356
1357+ def self . new_blocked1 ( blocker , *args , &block )
1358+ blocker_delayed = blocker . promise . delayed
1359+ delayed = blocker_delayed ? LockFreeStack . new . push ( blocker_delayed ) : nil
1360+ promise = new ( delayed , 1 , *args , &block )
1361+ ensure
1362+ blocker . add_callback :callback_notify_blocked , promise , 0
1363+ end
1364+
1365+ def self . new_blocked2 ( blocker1 , blocker2 , *args , &block )
1366+ blocker_delayed1 = blocker1 . promise . delayed
1367+ blocker_delayed2 = blocker2 . promise . delayed
1368+ delayed = if blocker_delayed1
1369+ if blocker_delayed2
1370+ LockFreeStack . new2 ( blocker_delayed1 , blocker_delayed2 )
1371+ else
1372+ LockFreeStack . new1 ( blocker_delayed1 )
1373+ end
1374+ else
1375+ blocker_delayed2 ? LockFreeStack . new1 ( blocker_delayed2 ) : nil
1376+ end
1377+ promise = new ( delayed , 2 , *args , &block )
1378+ ensure
1379+ blocker1 . add_callback :callback_notify_blocked , promise , 0
1380+ blocker2 . add_callback :callback_notify_blocked , promise , 1
1381+ end
1382+
13571383 def self . new_blocked ( blockers , *args , &block )
1358- delayed = blockers . each_with_object ( LockFreeStack . new , &method ( :add_delayed ) )
1384+ delayed = blockers . reduce ( nil , &method ( :add_delayed ) )
13591385 promise = new ( delayed , blockers . size , *args , &block )
13601386 ensure
13611387 blockers . each_with_index { |f , i | f . add_callback :callback_notify_blocked , promise , i }
13621388 end
13631389
1364- def self . add_delayed ( blocker , delayed )
1365- d = blocker . promise . delayed
1366- delayed . push ( d ) if d
1390+ def self . add_delayed ( delayed , blocker )
1391+ blocker_delayed = blocker . promise . delayed
1392+ if blocker_delayed
1393+ delayed = unless delayed
1394+ LockFreeStack . new1 ( blocker_delayed )
1395+ else
1396+ delayed . push ( blocker_delayed )
1397+ end
1398+ end
1399+ delayed
13671400 end
13681401
13691402 def initialize ( delayed , blockers_count , future )
@@ -1390,7 +1423,7 @@ def touch
13901423 end
13911424
13921425 def clear_propagate_touch
1393- @Delayed . clear_each { |o | propagate_touch o }
1426+ @Delayed . clear_each { |o | propagate_touch o } if @Delayed
13941427 end
13951428
13961429 # @!visibility private
@@ -1535,7 +1568,7 @@ def add_delayed_of(future)
15351568 if touched?
15361569 propagate_touch future . promise . delayed
15371570 else
1538- BlockedPromise . add_delayed future , @Delayed
1571+ BlockedPromise . add_delayed @Delayed , future
15391572 clear_propagate_touch if touched?
15401573 end
15411574 end
@@ -1581,7 +1614,9 @@ class FlatFuturePromise < AbstractFlatPromise
15811614
15821615 def initialize ( delayed , blockers_count , levels , default_executor )
15831616 raise ArgumentError , 'levels has to be higher than 0' if levels < 1
1584- super delayed , 1 + levels , Future . new ( self , default_executor )
1617+ # flat promise may result to a future having delayed futures, therefore we have to have empty stack
1618+ # to be able to add new delayed futures
1619+ super delayed || LockFreeStack . new , 1 + levels , Future . new ( self , default_executor )
15851620 end
15861621
15871622 def process_on_blocker_resolution ( future , index )
@@ -1915,7 +1950,7 @@ class Future < AbstractEventFuture
19151950 # @return [Future]
19161951 def then_select ( *channels )
19171952 future = Concurrent ::Promises . select ( *channels )
1918- ZipFuturesPromise . new_blocked ( [ self , future ] , @DefaultExecutor ) . future
1953+ ZipFuturesPromise . new_blocked2 ( self , future , @DefaultExecutor ) . future
19191954 end
19201955
19211956 # @note may block
0 commit comments