Skip to content

Commit ee91747

Browse files
authored
Merge pull request #1 from globocom/fix-close-race-condition
Fix Close race condition
2 parents 613be7b + e818981 commit ee91747

File tree

1 file changed

+17
-10
lines changed

1 file changed

+17
-10
lines changed

buffer.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type (
1717
io.Closer
1818
dataCh chan interface{}
1919
flushCh chan struct{}
20+
closeCh chan struct{}
2021
doneCh chan struct{}
2122
options *Options
2223
}
@@ -44,19 +45,22 @@ func (buffer *Buffer) Flush() error {
4445
}
4546
}
4647

47-
// Close flushes the buffer and prevents it from being further used. It times
48-
// out if it cannot be performed in a timely fashion.
49-
// The buffer must not be used after it has been closed as all further operations will panic.
48+
// Close flushes the buffer and prevents it from being further used. The buffer
49+
// cannot be used after it has been closed as all further operations will panic.
5050
func (buffer *Buffer) Close() error {
51-
close(buffer.dataCh)
52-
close(buffer.flushCh)
51+
close(buffer.closeCh)
5352

53+
var err error
5454
select {
5555
case <-buffer.doneCh:
56-
return nil
56+
err = nil
5757
case <-time.After(buffer.options.CloseTimeout):
58-
return ErrTimeout
58+
err = ErrTimeout
5959
}
60+
61+
close(buffer.dataCh)
62+
close(buffer.flushCh)
63+
return err
6064
}
6165

6266
func (buffer *Buffer) consume() {
@@ -75,8 +79,10 @@ func (buffer *Buffer) consume() {
7579
mustFlush = count >= len(items)
7680
case <-ticker:
7781
mustFlush = count > 0
78-
case _, open := <-buffer.flushCh:
79-
isOpen = open
82+
case <-buffer.flushCh:
83+
mustFlush = count > 0
84+
case <-buffer.closeCh:
85+
isOpen = false
8086
mustFlush = count > 0
8187
}
8288

@@ -90,7 +96,7 @@ func (buffer *Buffer) consume() {
9096
}
9197

9298
stopTicker()
93-
buffer.doneCh <- struct{}{}
99+
close(buffer.doneCh)
94100
}
95101

96102
func newTicker(interval time.Duration) (<-chan time.Time, func()) {
@@ -124,6 +130,7 @@ func New(opts ...Option) *Buffer {
124130
buffer := &Buffer{
125131
dataCh: make(chan interface{}),
126132
flushCh: make(chan struct{}),
133+
closeCh: make(chan struct{}),
127134
doneCh: make(chan struct{}),
128135
options: options,
129136
}

0 commit comments

Comments
 (0)