@@ -66,7 +66,7 @@ def completed_event(default_executor = :io)
6666 # requested by calling `#wait`, `#value`, `#value!`, etc. on it or on any of the chained futures.
6767 # @return [Future]
6868 def delay ( default_executor = :io , &task )
69- Delay . new ( default_executor ) . future . then ( &task )
69+ DelayPromise . new ( default_executor ) . future . then ( &task )
7070 end
7171
7272 # Schedules the block to be executed on executor in given intended_time.
@@ -98,8 +98,18 @@ def zip_events(*futures_and_or_events)
9898 # Constructs new {Future} which is completed after first of the futures is complete.
9999 # @param [Event] futures
100100 # @return [Future]
101- def any ( *futures )
102- AnyPromise . new ( futures , :io ) . future
101+ def any_complete ( *futures )
102+ AnyCompletePromise . new ( futures , :io ) . future
103+ end
104+
105+ alias_method :any , :any_complete
106+
107+ # Constructs new {Future} which becomes succeeded after first of the futures succeedes or
108+ # failed if all futures fail (reason is last error).
109+ # @param [Event] futures
110+ # @return [Future]
111+ def any_successful ( *futures )
112+ AnySuccessfulPromise . new ( futures , :io ) . future
103113 end
104114
105115 # only proof of concept
@@ -137,13 +147,11 @@ def post_on(executor, *args, &job)
137147 # TODO allow to to have a zip point for many futures and process them in batches by 10
138148 end
139149
140- extend FutureShortcuts
141- include FutureShortcuts
142-
143150 # Represents an event which will happen in future (will be completed). It has to always happen.
144- class Event < Synchronization ::LockableObject
151+ class Event < Synchronization ::Object
145152 safe_initialization!
146153 private ( *attr_atomic ( :internal_state ) )
154+ # @!visibility private
147155 public :internal_state
148156 include Concern ::Deprecation
149157 include Concern ::Logging
@@ -188,13 +196,13 @@ def to_sym
188196
189197 def initialize ( promise , default_executor )
190198 super ( )
199+ @Lock = Mutex . new
200+ @Condition = ConditionVariable . new
191201 @Promise = promise
192202 @DefaultExecutor = default_executor
193- @Touched = AtomicBoolean . new ( false )
203+ @Touched = AtomicBoolean . new false
194204 @Callbacks = LockFreeStack . new
195- # TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem
196- # TODO (pitr 12-Sep-2015): look at java.util.concurrent solution
197- @Waiters = LockFreeStack . new
205+ @Waiters = AtomicFixnum . new 0
198206 self . internal_state = PENDING
199207 end
200208
@@ -276,7 +284,7 @@ def zip(other)
276284 # Inserts delay into the chain of Futures making rest of it lazy evaluated.
277285 # @return [Event]
278286 def delay
279- ZipEventEventPromise . new ( self , Delay . new ( @DefaultExecutor ) . event , @DefaultExecutor ) . event
287+ ZipEventEventPromise . new ( self , DelayPromise . new ( @DefaultExecutor ) . event , @DefaultExecutor ) . event
280288 end
281289
282290 # # Schedules rest of the chain for execution with specified time or on specified time
@@ -298,13 +306,13 @@ def then_select(*channels)
298306 # @yield [success, value, reason] executed async on `executor` when completed
299307 # @return self
300308 def on_completion ( executor = nil , &callback )
301- add_callback :pr_async_callback_on_completion , executor || @DefaultExecutor , callback
309+ add_callback :async_callback_on_completion , executor || @DefaultExecutor , callback
302310 end
303311
304312 # @yield [success, value, reason] executed sync when completed
305313 # @return self
306314 def on_completion! ( &callback )
307- add_callback :pr_callback_on_completion , callback
315+ add_callback :callback_on_completion , callback
308316 end
309317
310318 # Changes default executor for rest of the chain
@@ -329,9 +337,8 @@ def set(*args, &block)
329337 # @!visibility private
330338 def complete_with ( state , raise_on_reassign = true )
331339 if compare_and_set_internal_state ( PENDING , state )
332- #(state)
333340 # go to synchronized block only if there were waiting threads
334- synchronize { ns_broadcast } if @Waiters . clear
341+ @Lock . synchronize { @Condition . broadcast } unless @Waiters . value == 0
335342 call_callbacks
336343 else
337344 Concurrent ::MultipleAssignmentError . new ( 'Event can be completed only once' ) if raise_on_reassign
@@ -388,32 +395,31 @@ def waiting_threads
388395
389396 # @return [true, false]
390397 def wait_until_complete ( timeout )
391- while true
392- last_waiter = @Waiters . peek # waiters' state before completion
393- return true if completed?
394-
395- # synchronize so it cannot be signaled before it waits
396- synchronize do
397- # ok only if completing thread did not start signaling
398- next unless @Waiters . compare_and_push last_waiter , Thread . current
399- return ns_wait_until ( timeout ) { completed? }
398+ return true if completed?
399+
400+ @Lock . synchronize do
401+ @Waiters . increment
402+ unless completed?
403+ @Condition . wait @Lock , timeout
400404 end
405+ @Waiters . decrement
401406 end
407+ completed?
402408 end
403409
404- def pr_with_async ( executor , *args , &block )
410+ def with_async ( executor , *args , &block )
405411 Concurrent . post_on ( executor , *args , &block )
406412 end
407413
408- def pr_async_callback_on_completion ( executor , callback )
409- pr_with_async ( executor ) { pr_callback_on_completion callback }
414+ def async_callback_on_completion ( executor , callback )
415+ with_async ( executor ) { callback_on_completion callback }
410416 end
411417
412- def pr_callback_on_completion ( callback )
418+ def callback_on_completion ( callback )
413419 callback . call
414420 end
415421
416- def pr_callback_notify_blocked ( promise )
422+ def callback_notify_blocked ( promise )
417423 promise . on_done self
418424 end
419425
@@ -660,13 +666,13 @@ def flat(level = 1)
660666
661667 # @return [Future] which has first completed value from futures
662668 def any ( *futures )
663- AnyPromise . new ( [ self , *futures ] , @DefaultExecutor ) . future
669+ AnyCompletePromise . new ( [ self , *futures ] , @DefaultExecutor ) . future
664670 end
665671
666672 # Inserts delay into the chain of Futures making rest of it lazy evaluated.
667673 # @return [Future]
668674 def delay
669- ZipFutureEventPromise . new ( self , Delay . new ( @DefaultExecutor ) . future , @DefaultExecutor ) . future
675+ ZipFutureEventPromise . new ( self , DelayPromise . new ( @DefaultExecutor ) . future , @DefaultExecutor ) . future
670676 end
671677
672678 # Schedules rest of the chain for execution with specified time or on specified time
@@ -714,32 +720,32 @@ def then_put(channel)
714720 # @yield [value] executed async on `executor` when success
715721 # @return self
716722 def on_success ( executor = nil , &callback )
717- add_callback :pr_async_callback_on_success , executor || @DefaultExecutor , callback
723+ add_callback :async_callback_on_success , executor || @DefaultExecutor , callback
718724 end
719725
720726 # @yield [reason] executed async on `executor` when failed?
721727 # @return self
722728 def on_failure ( executor = nil , &callback )
723- add_callback :pr_async_callback_on_failure , executor || @DefaultExecutor , callback
729+ add_callback :async_callback_on_failure , executor || @DefaultExecutor , callback
724730 end
725731
726732 # @yield [value] executed sync when success
727733 # @return self
728734 def on_success! ( &callback )
729- add_callback :pr_callback_on_success , callback
735+ add_callback :callback_on_success , callback
730736 end
731737
732738 # @yield [reason] executed sync when failed?
733739 # @return self
734740 def on_failure! ( &callback )
735- add_callback :pr_callback_on_failure , callback
741+ add_callback :callback_on_failure , callback
736742 end
737743
738744 # @!visibility private
739745 def complete_with ( state , raise_on_reassign = true )
740746 if compare_and_set_internal_state ( PENDING , state )
741- @Waiters . clear
742- synchronize { ns_broadcast }
747+ # go to synchronized block only if there were waiting threads
748+ @Lock . synchronize { @Condition . broadcast } unless @Waiters . value == 0
743749 call_callbacks state
744750 else
745751 if raise_on_reassign
@@ -792,37 +798,37 @@ def call_callback(method, state, *args)
792798 self . send method , state , *args
793799 end
794800
795- def pr_async_callback_on_success ( state , executor , callback )
796- pr_with_async ( executor , state , callback ) do |st , cb |
797- pr_callback_on_success st , cb
801+ def async_callback_on_success ( state , executor , callback )
802+ with_async ( executor , state , callback ) do |st , cb |
803+ callback_on_success st , cb
798804 end
799805 end
800806
801- def pr_async_callback_on_failure ( state , executor , callback )
802- pr_with_async ( executor , state , callback ) do |st , cb |
803- pr_callback_on_failure st , cb
807+ def async_callback_on_failure ( state , executor , callback )
808+ with_async ( executor , state , callback ) do |st , cb |
809+ callback_on_failure st , cb
804810 end
805811 end
806812
807- def pr_callback_on_success ( state , callback )
813+ def callback_on_success ( state , callback )
808814 state . apply callback if state . success?
809815 end
810816
811- def pr_callback_on_failure ( state , callback )
817+ def callback_on_failure ( state , callback )
812818 state . apply callback unless state . success?
813819 end
814820
815- def pr_callback_on_completion ( state , callback )
821+ def callback_on_completion ( state , callback )
816822 callback . call state . result
817823 end
818824
819- def pr_callback_notify_blocked ( state , promise )
825+ def callback_notify_blocked ( state , promise )
820826 super ( promise )
821827 end
822828
823- def pr_async_callback_on_completion ( state , executor , callback )
824- pr_with_async ( executor , state , callback ) do |st , cb |
825- pr_callback_on_completion st , cb
829+ def async_callback_on_completion ( state , executor , callback )
830+ with_async ( executor , state , callback ) do |st , cb |
831+ callback_on_completion st , cb
826832 end
827833 end
828834
@@ -1001,7 +1007,7 @@ class InnerPromise < AbstractPromise
10011007 class BlockedPromise < InnerPromise
10021008 def self . new ( *args , &block )
10031009 promise = super ( *args , &block )
1004- promise . blocked_by . each { |f | f . add_callback :pr_callback_notify_blocked , promise }
1010+ promise . blocked_by . each { |f | f . add_callback :callback_notify_blocked , promise }
10051011 promise
10061012 end
10071013
@@ -1014,7 +1020,7 @@ def initialize(future, blocked_by_futures, countdown)
10141020 # @api private
10151021 def on_done ( future )
10161022 countdown = process_on_done ( future )
1017- completable = completable? ( countdown )
1023+ completable = completable? ( countdown , future )
10181024
10191025 if completable
10201026 on_completable ( future )
@@ -1051,7 +1057,7 @@ def clear_blocked_by!
10511057 end
10521058
10531059 # @return [true,false] if completable
1054- def completable? ( countdown )
1060+ def completable? ( countdown , future )
10551061 countdown . zero?
10561062 end
10571063
@@ -1171,7 +1177,7 @@ def process_on_done(future)
11711177 case value
11721178 when Future
11731179 @BlockedBy . push value
1174- value . add_callback :pr_callback_notify_blocked , self
1180+ value . add_callback :callback_notify_blocked , self
11751181 @Countdown . value
11761182 when Event
11771183 evaluate_to ( lambda { raise TypeError , 'cannot flatten to Event' } )
@@ -1200,8 +1206,8 @@ def clear_blocked_by!
12001206 nil
12011207 end
12021208
1203- def completable? ( countdown )
1204- !@Future . internal_state . completed? && super ( countdown )
1209+ def completable? ( countdown , future )
1210+ !@Future . internal_state . completed? && super ( countdown , future )
12051211 end
12061212 end
12071213
@@ -1321,7 +1327,7 @@ def on_completable(done_future)
13211327 end
13221328
13231329 # @!visibility private
1324- class AnyPromise < BlockedPromise
1330+ class AnyCompletePromise < BlockedPromise
13251331
13261332 private
13271333
@@ -1331,7 +1337,7 @@ def initialize(blocked_by_futures, default_executor)
13311337 super ( Future . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
13321338 end
13331339
1334- def completable? ( countdown )
1340+ def completable? ( countdown , future )
13351341 true
13361342 end
13371343
@@ -1341,29 +1347,35 @@ def on_completable(done_future)
13411347 end
13421348
13431349 # @!visibility private
1344- class Delay < InnerPromise
1345- def touch
1346- @Future . complete_with Event ::COMPLETED
1347- end
1350+ class AnySuccessfulPromise < BlockedPromise
13481351
13491352 private
13501353
1351- def initialize ( default_executor )
1352- super Event . new ( self , default_executor )
1354+ def initialize ( blocked_by_futures , default_executor )
1355+ blocked_by_futures . all? { |f | f . is_a? Future } or
1356+ raise ArgumentError , 'accepts only Futures not Events'
1357+ super ( Future . new ( self , default_executor ) , blocked_by_futures , blocked_by_futures . size )
1358+ end
1359+
1360+ def completable? ( countdown , future )
1361+ future . success? || super ( countdown , future )
1362+ end
1363+
1364+ def on_completable ( done_future )
1365+ complete_with done_future . internal_state , false
13531366 end
13541367 end
13551368
13561369 # @!visibility private
1357- class DelayValue < InnerPromise
1370+ class DelayPromise < InnerPromise
13581371 def touch
1359- @Future . complete_with Future :: Success . new ( @Value )
1372+ @Future . complete_with Event :: COMPLETED
13601373 end
13611374
13621375 private
13631376
1364- def initialize ( default_executor , value )
1365- super Future . new ( self , default_executor )
1366- @Value = value
1377+ def initialize ( default_executor )
1378+ super Event . new ( self , default_executor )
13671379 end
13681380 end
13691381
@@ -1401,7 +1413,10 @@ def initialize(default_executor, intended_time)
14011413 end
14021414 end
14031415 end
1404-
1405- extend Edge ::FutureShortcuts
1406- include Edge ::FutureShortcuts
14071416end
1417+
1418+ Concurrent ::Edge . send :extend , Concurrent ::Edge ::FutureShortcuts
1419+ Concurrent ::Edge . send :include , Concurrent ::Edge ::FutureShortcuts
1420+
1421+ Concurrent . send :extend , Concurrent ::Edge ::FutureShortcuts
1422+ Concurrent . send :include , Concurrent ::Edge ::FutureShortcuts
0 commit comments