@@ -20,7 +20,7 @@ module Concurrent
2020 module Edge
2121
2222 module FutureShortcuts
23- # User is responsible for completing the event once.
23+ # User is responsible for completing the event once by {CompletableEvent#complete}
2424 # @return [CompletableEvent]
2525 def event ( default_executor = :io )
2626 CompletableEventPromise . new ( default_executor ) . future
@@ -29,42 +29,57 @@ def event(default_executor = :io)
2929 # @overload future(default_executor = :io, &task)
3030 # Constructs new Future which will be completed after block is evaluated on executor. Evaluation begins immediately.
3131 # @return [Future]
32- # @note FIXME allow to pass in variables as Thread.new(args) {|args| _ } does
3332 # @overload future(default_executor = :io)
34- # User is responsible for completing the future once.
33+ # User is responsible for completing the future once by {CompletableFuture#success} or {CompletableFuture#fail}
3534 # @return [CompletableFuture]
36- def future ( default_executor = :io , &task )
35+ def future ( *args , &task )
36+ future_on :io , *args , &task
37+ end
38+
39+ def future_on ( default_executor , *args , &task )
3740 if task
38- ImmediatePromise . new ( default_executor ) . event . chain ( &task )
41+ ImmediatePromise . new ( default_executor , * args ) . future . then ( &task )
3942 else
4043 CompletableFuturePromise . new ( default_executor ) . future
4144 end
4245 end
4346
4447 alias_method :async , :future
4548
46- # Constructs new Future which will be completed after block is evaluated on executor. Evaluation is delayed until
47- # requested by `#wait`, `#value`, `#value!`, etc.
49+ # Constructs new Future which will evaluate to the block after
50+ # requested by calling `#wait`, `#value`, `#value!`, etc. on it or on any of the chained futures .
4851 # @return [Delay]
49- def delay ( default_executor = :io , &task )
50- Delay . new ( default_executor ) . event . chain ( &task )
52+ def delay ( *args , &task )
53+ delay_on :io , *args , &task
54+ end
55+
56+ def delay_on ( default_executor , *args , &task )
57+ Delay . new ( default_executor , *args ) . future . then ( &task )
5158 end
5259
5360 # Schedules the block to be executed on executor in given intended_time.
5461 # @return [Future]
55- def schedule ( intended_time , default_executor = :io , &task )
56- ScheduledPromise . new ( intended_time , default_executor ) . future . chain ( &task )
62+ def schedule ( intended_time , *args , &task )
63+ schedule_on :io , intended_time , *args , &task
64+ end
65+
66+ def schedule_on ( default_executor , intended_time , *args , &task )
67+ ScheduledPromise . new ( default_executor , intended_time , *args ) . future . then ( &task )
5768 end
5869
59- # fails on first error
60- # does not block a thread
70+ # Constructs new {Future} which is completed after all futures are complete. Its value is array
71+ # of dependent future values. If there is an error it fails with the first one.
72+ # @param [Event] futures
6173 # @return [Future]
6274 def zip ( *futures )
63- AllPromise . new ( futures ) . future
75+ AllPromise . new ( futures , :io ) . future
6476 end
6577
78+ # Constructs new {Future} which is completed after first of the futures is complete.
79+ # @param [Event] futures
80+ # @return [Future]
6681 def any ( *futures )
67- AnyPromise . new ( futures ) . future
82+ AnyPromise . new ( futures , :io ) . future
6883 end
6984
7085 def post! ( *args , &job )
@@ -89,7 +104,7 @@ def post_on(executor, *args, &job)
89104 class Event < Synchronization ::Object
90105 extend FutureShortcuts
91106
92- def initialize ( promise , default_executor = :io )
107+ def initialize ( promise , default_executor )
93108 @Promise = promise
94109 @DefaultExecutor = default_executor
95110 @Touched = AtomicBoolean . new ( false )
@@ -157,7 +172,7 @@ def delay
157172 end
158173
159174 def schedule ( intended_time )
160- chain { ScheduledPromise . new ( intended_time ) . future . zip ( self ) } . flat
175+ chain { ScheduledPromise . new ( @DefaultExecutor , intended_time ) . event . zip ( self ) } . flat
161176 end
162177
163178 # @yield [success, value, reason] executed async on `executor` when completed
@@ -172,7 +187,7 @@ def on_completion!(&callback)
172187 add_callback :pr_callback_on_completion , callback
173188 end
174189
175- def with_default_executor ( executor = @DefaultExecutor )
190+ def with_default_executor ( executor )
176191 AllPromise . new ( [ self ] , executor ) . future
177192 end
178193
@@ -615,7 +630,7 @@ def evaluate_to(*args, block)
615630 class CompletableEventPromise < AbstractPromise
616631 public :complete
617632
618- def initialize ( default_executor = :io )
633+ def initialize ( default_executor )
619634 super CompletableEvent . new ( self , default_executor )
620635 end
621636 end
@@ -624,7 +639,7 @@ def initialize(default_executor = :io)
624639 class CompletableFuturePromise < AbstractPromise
625640 # TODO consider to allow being blocked_by
626641
627- def initialize ( default_executor = :io )
642+ def initialize ( default_executor )
628643 super CompletableFuture . new ( self , default_executor )
629644 end
630645
@@ -732,7 +747,7 @@ def on_completable(done_future)
732747
733748 # @abstract
734749 class BlockedTaskPromise < BlockedPromise
735- def initialize ( blocked_by_future , default_executor = :io , executor = default_executor , &task )
750+ def initialize ( blocked_by_future , default_executor , executor , &task )
736751 raise ArgumentError , 'no block given' unless block_given?
737752 @Executor = executor
738753 @Task = task
@@ -747,7 +762,7 @@ def executor
747762 class ThenPromise < BlockedTaskPromise
748763 private
749764
750- def initialize ( blocked_by_future , default_executor = :io , executor = default_executor , &task )
765+ def initialize ( blocked_by_future , default_executor , executor , &task )
751766 raise ArgumentError , 'only Future can be appended with then' unless blocked_by_future . is_a? Future
752767 super blocked_by_future , default_executor , executor , &task
753768 end
@@ -766,7 +781,7 @@ def on_completable(done_future)
766781 class RescuePromise < BlockedTaskPromise
767782 private
768783
769- def initialize ( blocked_by_future , default_executor = :io , executor = default_executor , &task )
784+ def initialize ( blocked_by_future , default_executor , executor , &task )
770785 raise ArgumentError , 'only Future can be rescued' unless blocked_by_future . is_a? Future
771786 super blocked_by_future , default_executor , executor , &task
772787 end
@@ -794,8 +809,16 @@ def on_completable(done_future)
794809
795810 # will be immediately completed
796811 class ImmediatePromise < InnerPromise
797- def initialize ( default_executor = :io )
798- super Event . new ( self , default_executor ) . complete
812+ def initialize ( default_executor , *args )
813+ # TODO optimize, create completed futures directly
814+ super case args . size
815+ when 0
816+ Event . new ( self , default_executor ) . complete
817+ when 1
818+ Future . new ( self , default_executor ) . complete ( true , args [ 0 ] , nil )
819+ else
820+ ArrayFuture . new ( self , default_executor ) . complete ( true , args , nil )
821+ end
799822 end
800823 end
801824
@@ -808,7 +831,7 @@ def blocked_by
808831
809832 def process_on_done ( future )
810833 countdown = super ( future )
811- value = future . value
834+ value = future . value!
812835 if countdown . nonzero?
813836 case value
814837 when Future
@@ -824,7 +847,7 @@ def process_on_done(future)
824847 countdown
825848 end
826849
827- def initialize ( blocked_by_future , levels = 1 , default_executor = :io )
850+ def initialize ( blocked_by_future , levels , default_executor )
828851 raise ArgumentError , 'levels has to be higher than 0' if levels < 1
829852 blocked_by_future . is_a? Future or
830853 raise ArgumentError , 'only Future can be flatten'
@@ -851,7 +874,7 @@ class AllPromise < BlockedPromise
851874
852875 private
853876
854- def initialize ( blocked_by_futures , default_executor = :io )
877+ def initialize ( blocked_by_futures , default_executor )
855878 klass = Event
856879 blocked_by_futures . each do |f |
857880 if f . is_a? ( Future )
@@ -901,7 +924,7 @@ class AnyPromise < BlockedPromise
901924
902925 private
903926
904- def initialize ( blocked_by_futures , default_executor = :io )
927+ def initialize ( blocked_by_futures , default_executor )
905928 blocked_by_futures . all? { |f | f . is_a? Future } or
906929 raise ArgumentError , 'accepts only Futures not Events'
907930 super ( Future . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
@@ -918,13 +941,28 @@ def on_completable(done_future)
918941
919942 class Delay < InnerPromise
920943 def touch
921- complete
944+ case @Args . size
945+ when 0
946+ @Future . complete
947+ when 1
948+ @Future . complete ( true , @Args [ 0 ] , nil )
949+ else
950+ @Future . complete ( true , @Args , nil )
951+ end
922952 end
923953
924954 private
925955
926- def initialize ( default_executor = :io )
927- super Event . new ( self , default_executor )
956+ def initialize ( default_executor , *args )
957+ @Args = args
958+ super case args . size
959+ when 0
960+ Event . new ( self , default_executor )
961+ when 1
962+ Future . new ( self , default_executor )
963+ else
964+ ArrayFuture . new ( self , default_executor )
965+ end
928966 end
929967 end
930968
@@ -940,7 +978,7 @@ def inspect
940978
941979 private
942980
943- def initialize ( intended_time , default_executor = :io )
981+ def initialize ( default_executor , intended_time , * args )
944982 @IntendedTime = intended_time
945983
946984 in_seconds = begin
@@ -953,9 +991,26 @@ def initialize(intended_time, default_executor = :io)
953991 [ 0 , schedule_time . to_f - now . to_f ] . max
954992 end
955993
956- super Event . new ( self , default_executor )
994+ super case args . size
995+ when 0
996+ Event . new ( self , default_executor )
997+ when 1
998+ Future . new ( self , default_executor )
999+ else
1000+ ArrayFuture . new ( self , default_executor )
1001+ end
1002+
1003+ Concurrent . global_timer_set . post ( in_seconds , *args ) do |*args |
1004+ case args . size
1005+ when 0
1006+ @Future . complete
1007+ when 1
1008+ @Future . complete ( true , args [ 0 ] , nil )
1009+ else
1010+ @Future . complete ( true , args , nil )
1011+ end
1012+ end
9571013
958- Concurrent . global_timer_set . post ( in_seconds ) { complete }
9591014 end
9601015 end
9611016 end
0 commit comments