1+ require 'concurrent/synchronization'
12require 'concurrent/channel/waitable_list'
23
34module Concurrent
45 module Channel
56
67 # @api Channel
78 # @!macro edge_warning
8- class BufferedChannel
9+ class BufferedChannel < Synchronization :: Object
910
1011 def initialize ( size )
11- @mutex = Mutex . new
12- @buffer_condition = ConditionVariable . new
13-
14- @probe_set = WaitableList . new
15- @buffer = RingBuffer . new ( size )
12+ super ( )
13+ synchronize { ns_initialize ( size ) }
1614 end
1715
1816 def probe_set_size
1917 @probe_set . size
2018 end
2119
2220 def buffer_queue_size
23- @mutex . synchronize { @buffer . count }
21+ synchronize { @buffer . count }
2422 end
2523
2624 def push ( value )
27- until set_probe_or_push_into_buffer ( value )
25+ until set_probe_or_ns_push_into_buffer ( value )
2826 end
2927 end
3028
@@ -35,53 +33,57 @@ def pop
3533 end
3634
3735 def select ( probe )
38- @mutex . synchronize do
39-
36+ synchronize do
4037 if @buffer . empty?
4138 @probe_set . put ( probe )
4239 true
4340 else
44- shift_buffer if probe . try_set ( [ peek_buffer , self ] )
41+ ns_shift_buffer if probe . try_set ( [ ns_peek_buffer , self ] )
4542 end
46-
4743 end
4844 end
4945
5046 def remove_probe ( probe )
5147 @probe_set . delete ( probe )
5248 end
5349
50+ protected
51+
52+ def ns_initialize ( size )
53+ @probe_set = WaitableList . new
54+ @buffer = RingBuffer . new ( size )
55+ end
56+
5457 private
5558
56- def push_into_buffer ( value )
57- @buffer_condition . wait ( @mutex ) while @buffer . full?
59+ def ns_push_into_buffer ( value )
60+ ns_wait while @buffer . full?
5861 @buffer . offer value
59- @buffer_condition . broadcast
62+ ns_broadcast
6063 end
6164
62- def peek_buffer
63- @buffer_condition . wait ( @mutex ) while @buffer . empty?
65+ def ns_peek_buffer
66+ ns_wait while @buffer . empty?
6467 @buffer . peek
6568 end
6669
67- def shift_buffer
68- @buffer_condition . wait ( @mutex ) while @buffer . empty?
70+ def ns_shift_buffer
71+ ns_wait while @buffer . empty?
6972 result = @buffer . poll
70- @buffer_condition . broadcast
73+ ns_broadcast
7174 result
7275 end
7376
74- def set_probe_or_push_into_buffer ( value )
75- @mutex . synchronize do
77+ def set_probe_or_ns_push_into_buffer ( value )
78+ synchronize do
7679 if @probe_set . empty?
77- push_into_buffer ( value )
80+ ns_push_into_buffer ( value )
7881 true
7982 else
8083 @probe_set . take . try_set ( [ value , self ] )
8184 end
8285 end
8386 end
84-
8587 end
8688 end
8789end
0 commit comments