11require 'concurrent/synchronization'
22require 'concurrent/atomic/atomic_boolean'
33require 'concurrent/atomic/atomic_fixnum'
4- require 'concurrent/lock_free_stack'
4+ require 'concurrent/edge/ lock_free_stack'
55require 'concurrent/errors'
66
77module Concurrent
@@ -781,7 +781,7 @@ class Event < AbstractEventFuture
781781 #
782782 # @return [Future, Event]
783783 def zip ( other )
784- if other . is ?( Future )
784+ if other . is_a ?( Future )
785785 ZipFutureEventPromise . new ( other , self , @DefaultExecutor ) . future
786786 else
787787 ZipEventEventPromise . new ( self , other , @DefaultExecutor ) . event
@@ -825,7 +825,20 @@ def schedule(intended_time)
825825 end . flat_event
826826 end
827827
828- # TODO (pitr-ch 12-Jun-2016): add to_event, to_future
828+ # Converts event to a future. The future is fulfilled when the event is resolved, the future may never fail.
829+ #
830+ # @return [Future]
831+ def to_future
832+ future = Promises . resolvable_future
833+ ensure
834+ chain_resolvable ( future )
835+ end
836+
837+ # Returns self, since this is event
838+ # @return [Event]
839+ def to_event
840+ self
841+ end
829842
830843 # @!macro promises.method.with_default_executor
831844 # @return [Event]
@@ -1111,6 +1124,21 @@ def apply(args, block)
11111124 internal_state . apply args , block
11121125 end
11131126
1127+ # Converts future to event which is resolved when future is resolved by fulfillment or rejection.
1128+ #
1129+ # @return [Event]
1130+ def to_event
1131+ event = Promises . resolvable_event
1132+ ensure
1133+ chain_resolvable ( event )
1134+ end
1135+
1136+ # Returns self, since this is a future
1137+ # @return [Future]
1138+ def to_future
1139+ self
1140+ end
1141+
11141142 private
11151143
11161144 def rejected_resolution ( raise_on_reassign , state )
@@ -1195,7 +1223,8 @@ class ResolvableFuture < Future
11951223 # which triggers all dependent futures.
11961224 #
11971225 # @!macro promise.param.raise_on_reassign
1198- def resolve ( fulfilled , value , reason , raise_on_reassign = true )
1226+ def resolve ( fulfilled = true , value = nil , reason = nil , raise_on_reassign = true )
1227+ # TODO (pitr-ch 25-Sep-2016): should the defaults be kept to match event resolve api?
11991228 resolve_with ( fulfilled ? Fulfilled . new ( value ) : Rejected . new ( reason ) , raise_on_reassign )
12001229 end
12011230
@@ -1288,8 +1317,8 @@ def resolve_with(new_state, raise_on_reassign = true)
12881317 # @return [Future]
12891318 def evaluate_to ( *args , block )
12901319 resolve_with Fulfilled . new ( block . call ( *args ) )
1291- # TODO (pitr-ch 30-Jul-2016): figure out what should be rescued, there is an issue about it
12921320 rescue Exception => error
1321+ # TODO (pitr-ch 30-Jul-2016): figure out what should be rescued, there is an issue about it
12931322 resolve_with Rejected . new ( error )
12941323 end
12951324 end
@@ -1358,7 +1387,7 @@ def on_resolution(future)
13581387
13591388 # @!visibility private
13601389 def touch
1361- # TODO (pitr-ch 13-Jun-2016): on construction pass down references of delays to be touched, avoids extra casses
1390+ # TODO (pitr-ch 13-Jun-2016): on construction pass down references of delays to be touched, avoids extra CASses
13621391 blocked_by . each ( &:touch )
13631392 end
13641393
@@ -1506,6 +1535,11 @@ def clear_blocked_by!
15061535 nil
15071536 end
15081537
1538+ def blocked_by_add ( future )
1539+ @BlockedBy . push future
1540+ future . touch if self . future . touched?
1541+ end
1542+
15091543 def resolvable? ( countdown , future )
15101544 !@Future . internal_state . resolved? && super ( countdown , future )
15111545 end
@@ -1532,7 +1566,7 @@ def process_on_resolution(future)
15321566 value = internal_state . value
15331567 case value
15341568 when Future , Event
1535- @BlockedBy . push value
1569+ blocked_by_add value
15361570 value . add_callback :callback_notify_blocked , self
15371571 @Countdown . value
15381572 else
@@ -1566,7 +1600,7 @@ def process_on_resolution(future)
15661600 value = internal_state . value
15671601 case value
15681602 when Future
1569- @BlockedBy . push value
1603+ blocked_by_add value
15701604 value . add_callback :callback_notify_blocked , self
15711605 @Countdown . value
15721606 when Event
@@ -1599,7 +1633,8 @@ def process_on_resolution(future)
15991633 value = internal_state . value
16001634 case value
16011635 when Future
1602- # @BlockedBy.push value
1636+ # FIXME (pitr-ch 08-Dec-2016): will accumulate the completed futures
1637+ blocked_by_add value
16031638 value . add_callback :callback_notify_blocked , self
16041639 else
16051640 resolve_with internal_state
@@ -1871,7 +1906,7 @@ module FactoryMethods
18711906 # only proof of concept
18721907 # @return [Future]
18731908 def select ( *channels )
1874- # TODO (pitr-ch 26-Mar-2016): redo , has to be non-blocking
1909+ # TODO (pitr-ch 26-Mar-2016): re-do , has to be non-blocking
18751910 future do
18761911 # noinspection RubyArgCount
18771912 Channel . select do |s |
@@ -1924,12 +1959,14 @@ def each_body(value, &block)
19241959 end
19251960 end
19261961
1962+ # TODO example: parallel jobs, cancell them all when one fails, clean-up in zip
19271963 # inspired by https://msdn.microsoft.com/en-us/library/dd537607(v=vs.110).aspx
19281964 class Cancellation < Synchronization ::Object
19291965 safe_initialization!
19301966
19311967 def self . create ( future_or_event = Promises . resolvable_event , *resolve_args )
1932- [ ( i = new ( future_or_event , *resolve_args ) ) , i . token ]
1968+ cancellation = new ( future_or_event , *resolve_args )
1969+ [ cancellation , cancellation . token ]
19331970 end
19341971
19351972 private_class_method :new
@@ -1960,20 +1997,18 @@ def initialize(cancel)
19601997 @Cancel = cancel
19611998 end
19621999
1963- def event
1964- @Cancel
2000+ def to_event
2001+ @Cancel . to_event
19652002 end
19662003
1967- alias_method :future , :event
2004+ def to_future
2005+ @Cancel . to_future
2006+ end
19682007
19692008 def on_cancellation ( *args , &block )
19702009 @Cancel . on_resolution *args , &block
19712010 end
19722011
1973- def then ( *args , &block )
1974- @Cancel . chain *args , &block
1975- end
1976-
19772012 def canceled?
19782013 @Cancel . resolved?
19792014 end
@@ -1985,13 +2020,14 @@ def loop_until_canceled(&block)
19852020 result
19862021 end
19872022
1988- def raise_if_canceled
1989- raise CancelledOperationError if canceled?
2023+ def raise_if_canceled ( error = CancelledOperationError )
2024+ raise error if canceled?
19902025 self
19912026 end
19922027
1993- def join ( *tokens )
1994- Token . new Promises . any_event ( @Cancel , *tokens . map ( &:event ) )
2028+ def join ( *tokens , &block )
2029+ block ||= -> tokens { Promises . any_event ( *tokens . map ( &:to_event ) ) }
2030+ self . class . new block . call ( [ @Cancel , *tokens ] )
19952031 end
19962032
19972033 end
@@ -2002,7 +2038,7 @@ def join(*tokens)
20022038 # TODO (pitr-ch 27-Mar-2016): examples (scheduled to be cancelled in 10 sec)
20032039 end
20042040
2005- class Throttle < Synchronization ::Object
2041+ class Promises :: Throttle < Synchronization ::Object
20062042
20072043 safe_initialization!
20082044 private *attr_atomic ( :can_run )
@@ -2015,16 +2051,23 @@ def initialize(max)
20152051 end
20162052
20172053 def limit ( future = nil , &block )
2018- # TODO (pitr-ch 11-Jun-2016): triggers should allocate resources when they are to be required
2019- trigger = future ? future & get_event : get_event
2020-
2021- if block_given?
2022- block . call ( trigger ) . on_resolution! { done }
2054+ if future
2055+ # future.chain { block.call(new_trigger & future).on_resolution! { done } }.flat
2056+ block . call ( new_trigger & future ) . on_resolution! { done }
20232057 else
2024- get_event
2058+ if block_given?
2059+ block . call ( new_trigger ) . on_resolution! { done }
2060+ else
2061+ new_trigger
2062+ end
20252063 end
20262064 end
20272065
2066+ # TODO (pitr-ch 10-Oct-2016): maybe just then?
2067+ def then_limit ( &block )
2068+ limit { |trigger | trigger . then &block }
2069+ end
2070+
20282071 def done
20292072 while true
20302073 current_can_run = can_run
@@ -2037,7 +2080,7 @@ def done
20372080
20382081 private
20392082
2040- def get_event
2083+ def new_trigger
20412084 while true
20422085 current_can_run = can_run
20432086 if compare_and_set_can_run current_can_run , current_can_run - 1
@@ -2059,5 +2102,18 @@ def throttle(throttle, &throttled_future)
20592102 throttle . limit ( self , &throttled_future )
20602103 end
20612104
2105+ def then_throttle ( throttle , &block )
2106+ throttle ( throttle ) { |trigger | trigger . then &block }
2107+ end
2108+
2109+ end
2110+
2111+ module Promises ::FactoryMethods
2112+
2113+ # @!visibility private
2114+
2115+ def throttle ( count )
2116+ Promises ::Throttle . new count
2117+ end
20622118 end
20632119end
0 commit comments