@@ -8,8 +8,10 @@ module Concurrent
88
99 # {include:file:doc/channel.md}
1010 class Channel
11+ extend Forwardable
1112 include Enumerable
1213
14+ # NOTE: Move to global IO pool once stable
1315 GOROUTINES = Concurrent ::CachedThreadPool . new
1416 private_constant :GOROUTINES
1517
@@ -32,10 +34,17 @@ def initialize(message = nil)
3234 end
3335 end
3436
37+ def_delegators :buffer ,
38+ :size , :capacity , :close , :closed? ,
39+ :blocking? , :empty? , :full?
40+
41+ alias_method :length , :size
42+ alias_method :stop , :close
43+
3544 def initialize ( opts = { } )
3645 # undocumented -- for internal use only
3746 if opts . is_a? Buffer ::Base
38- @ buffer = opts
47+ self . buffer = opts
3948 return
4049 end
4150
@@ -45,25 +54,20 @@ def initialize(opts = {})
4554 if size && buffer == :unbuffered
4655 raise ArgumentError . new ( 'unbuffered channels cannot have a size' )
4756 elsif size . nil? && buffer . nil?
48- @ buffer = BUFFER_TYPES [ :unbuffered ] . new
57+ self . buffer = BUFFER_TYPES [ :unbuffered ] . new
4958 elsif size == 0 && buffer == :buffered
50- @ buffer = BUFFER_TYPES [ :unbuffered ] . new
59+ self . buffer = BUFFER_TYPES [ :unbuffered ] . new
5160 elsif buffer == :unbuffered
52- @ buffer = BUFFER_TYPES [ :unbuffered ] . new
61+ self . buffer = BUFFER_TYPES [ :unbuffered ] . new
5362 elsif size . nil? || size < 1
5463 raise ArgumentError . new ( 'size must be at least 1 for this buffer type' )
5564 else
5665 buffer ||= :buffered
57- @ buffer = BUFFER_TYPES [ buffer ] . new ( size )
66+ self . buffer = BUFFER_TYPES [ buffer ] . new ( size )
5867 end
5968
60- @validator = opts . fetch ( :validator , DEFAULT_VALIDATOR )
61- end
62-
63- def size
64- @buffer . size
69+ self . validator = opts . fetch ( :validator , DEFAULT_VALIDATOR )
6570 end
66- alias_method :capacity , :size
6771
6872 def put ( item )
6973 return false unless validate ( item , false , false )
@@ -129,22 +133,21 @@ def take?
129133 item
130134 end
131135
136+ # @example
132137 #
133- # @example
138+ # jobs = Channel.new
134139 #
135- # jobs = Channel.new
136- #
137- # Channel.go do
138- # loop do
139- # j, more = jobs.next
140- # if more
141- # print "received job #{j}\n"
142- # else
143- # print "received all jobs\n"
144- # break
145- # end
140+ # Channel.go do
141+ # loop do
142+ # j, more = jobs.next
143+ # if more
144+ # print "received job #{j}\n"
145+ # else
146+ # print "received all jobs\n"
147+ # break
146148 # end
147149 # end
150+ # end
148151 def next
149152 item , more = do_next
150153 item = nil if item == Buffer ::NO_VALUE
@@ -191,11 +194,6 @@ def each
191194 end
192195 end
193196
194- def close
195- @buffer . close
196- end
197- alias_method :stop , :close
198-
199197 class << self
200198 def timer ( seconds )
201199 Channel . new ( Buffer ::Timer . new ( seconds ) )
@@ -240,10 +238,12 @@ def go_loop_via(executor, *args, &block)
240238
241239 private
242240
241+ attr_accessor :buffer , :validator
242+
243243 def validate ( value , allow_nil , raise_error )
244244 if !allow_nil && value . nil?
245245 raise_error ? raise ( ValidationError . new ( 'nil is not a valid value' ) ) : false
246- elsif !@ validator. call ( value )
246+ elsif !validator . call ( value )
247247 raise_error ? raise ( ValidationError ) : false
248248 else
249249 true
@@ -254,19 +254,19 @@ def validate(value, allow_nil, raise_error)
254254 end
255255
256256 def do_put ( item )
257- @ buffer. put ( item )
257+ buffer . put ( item )
258258 end
259259
260260 def do_offer ( item )
261- @ buffer. offer ( item )
261+ buffer . offer ( item )
262262 end
263263
264264 def do_next
265- @ buffer. next
265+ buffer . next
266266 end
267267
268268 def do_poll
269- @ buffer. poll
269+ buffer . poll
270270 end
271271 end
272272end
0 commit comments