@@ -14,6 +14,35 @@ module Concurrent
1414 # {include:file:doc/promises-main.md}
1515 module Promises
1616
17+ # TODO (pitr-ch 23-Dec-2016): move out
18+ # @!visibility private
19+ module ReInclude
20+ def included ( base )
21+ included_into << [ :include , base ]
22+ super ( base )
23+ end
24+
25+ def extended ( base )
26+ included_into << [ :extend , base ]
27+ super ( base )
28+ end
29+
30+ def include ( *modules )
31+ super ( *modules )
32+ modules . reverse . each do |module_being_included |
33+ included_into . each do |method , mod |
34+ mod . send method , module_being_included
35+ end
36+ end
37+ end
38+
39+ private
40+
41+ def included_into
42+ @included_into ||= [ ]
43+ end
44+ end
45+
1746 # @!macro [new] promises.param.default_executor
1847 # @param [Executor, :io, :fast] default_executor Instance of an executor or a name of the
1948 # global executor. Default executor propagates to chained futures unless overridden with
@@ -46,6 +75,7 @@ module Promises
4675 # Container of all {Future}, {Event} factory methods. They are never constructed directly with
4776 # new.
4877 module FactoryMethods
78+ extend ReInclude
4979
5080 # @!macro promises.shortcut.on
5181 # @return [ResolvableEvent]
@@ -1922,115 +1952,6 @@ def then_ask(actor)
19221952 include ActorIntegration
19231953 end
19241954
1925- # A tool manage concurrency level of future tasks.
1926- # @example With futures
1927- # data = (1..5).to_a
1928- # db = data.reduce({}) { |h, v| h.update v => v.to_s }
1929- # max_two = Promises.throttle 2
1930- #
1931- # futures = data.map do |data|
1932- # Promises.future(data) { |data|
1933- # # un-throttled, concurrency level equal data.size
1934- # data + 1
1935- # }.then_throttle(max_two, db) { |v, db|
1936- # # throttled, only 2 tasks executed at the same time
1937- # # e.g. limiting access to db
1938- # db[v]
1939- # }
1940- # end
1941- #
1942- # futures.map(&:value!) # => [2, 3, 4, 5, nil]
1943- #
1944- # @example With Threads
1945- # max_two = Concurrent::Throttle.new 2
1946- # 5.timse
1947- class Throttle < Synchronization ::Object
1948- # TODO (pitr-ch 23-Dec-2016): move into different file
1949- # TODO (pitr-ch 23-Dec-2016): move to Concurrent space
1950- # TODO (pitr-ch 21-Dec-2016): consider using sized channel for implementation instead when available
1951-
1952- safe_initialization!
1953- private *attr_atomic ( :can_run )
1954-
1955- # New throttle.
1956- # @param [Integer] limit
1957- def initialize ( limit )
1958- super ( )
1959- @Limit = limit
1960- self . can_run = limit
1961- @Queue = LockFreeQueue . new
1962- end
1963-
1964- # @return [Integer] The limit.
1965- def limit
1966- @Limit
1967- end
1968-
1969- def trigger
1970- while true
1971- current_can_run = can_run
1972- if compare_and_set_can_run current_can_run , current_can_run - 1
1973- if current_can_run > 0
1974- return Promises . resolved_event
1975- else
1976- event = Promises . resolvable_event
1977- @Queue . push event
1978- return event
1979- end
1980- end
1981- end
1982- end
1983-
1984- def release
1985- while true
1986- current_can_run = can_run
1987- if compare_and_set_can_run current_can_run , current_can_run + 1
1988- if current_can_run < 0
1989- Thread . pass until ( trigger = @Queue . pop )
1990- trigger . resolve
1991- end
1992- return self
1993- end
1994- end
1995- end
1996-
1997- # @return [String] Short string representation.
1998- def to_s
1999- format '<#%s:0x%x limit:%s can_run:%d>' , self . class , object_id << 1 , @Limit , can_run
2000- end
2001-
2002- alias_method :inspect , :to_s
2003-
2004- module PromisesIntegration
2005- # TODO (pitr-ch 23-Dec-2016): apply similar pattern elsewhere
2006-
2007- def throttled ( &throttled_futures )
2008- throttled_futures . call ( trigger ) . on_resolution! { release }
2009- end
2010-
2011- def then_throttled ( *args , &task )
2012- trigger . then ( *args , &task ) . on_resolution! { release }
2013- end
2014- end
2015-
2016- include PromisesIntegration
2017- end
2018-
2019- class AbstractEventFuture < Synchronization ::Object
2020- module ThrottleIntegration
2021- def throttled_by ( throttle , &throttled_futures )
2022- a_trigger = throttle . trigger & self
2023- throttled_futures . call ( a_trigger ) . on_resolution! { throttle . release }
2024- end
2025-
2026- def then_throttled_by ( throttle , *args , &block )
2027- throttled_by ( throttle ) { |trigger | trigger . then ( *args , &block ) }
2028- end
2029- end
2030-
2031- include ThrottleIntegration
2032- end
2033-
20341955 ### Experimental features follow
20351956
20361957 module FactoryMethods
0 commit comments