Skip to content

Commit bfb9996

Browse files
authored
Merge pull request #2 from globocom/fix-close-race
fix race condition on Close
2 parents ee91747 + 8ca7384 commit bfb9996

File tree

3 files changed

+41
-11
lines changed

3 files changed

+41
-11
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
test:
2-
@go run github.com/onsi/ginkgo/ginkgo -keepGoing -progress -timeout 1m -race
2+
@go run github.com/onsi/ginkgo/ginkgo -keepGoing -progress -timeout 1m -race --randomizeAllSpecs --randomizeSuites

buffer.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,25 @@ func (buffer *Buffer) Flush() error {
4545
}
4646
}
4747

48-
// Close flushes the buffer and prevents it from being further used. The buffer
49-
// cannot be used after it has been closed as all further operations will panic.
48+
// Close flushes the buffer and prevents it from being further used. If it succeeds,
49+
// the buffer cannot be used after it has been closed as all further operations will panic.
5050
func (buffer *Buffer) Close() error {
51-
close(buffer.closeCh)
51+
select {
52+
case buffer.closeCh <- struct{}{}:
53+
// noop
54+
case <-time.After(buffer.options.CloseTimeout):
55+
return ErrTimeout
56+
}
5257

53-
var err error
5458
select {
5559
case <-buffer.doneCh:
56-
err = nil
60+
close(buffer.dataCh)
61+
close(buffer.flushCh)
62+
close(buffer.closeCh)
63+
return nil
5764
case <-time.After(buffer.options.CloseTimeout):
58-
err = ErrTimeout
65+
return ErrTimeout
5966
}
60-
61-
close(buffer.dataCh)
62-
close(buffer.flushCh)
63-
return err
6467
}
6568

6669
func (buffer *Buffer) consume() {

buffer_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,33 @@ var _ = Describe("Buffer", func() {
243243
// assert
244244
Expect(err).To(MatchError(buffer.ErrTimeout))
245245
})
246+
247+
It("allow Close to be called again if it fails", func() {
248+
// arrange
249+
flusher.Func = func() { time.Sleep(2 * time.Second) }
250+
251+
sut := buffer.New(
252+
buffer.WithSize(1),
253+
buffer.WithFlusher(flusher),
254+
buffer.WithCloseTimeout(time.Second),
255+
)
256+
_ = sut.Push(1)
257+
258+
// act
259+
err := sut.Close()
260+
261+
// assert
262+
Expect(err).To(MatchError(buffer.ErrTimeout))
263+
264+
// arrange
265+
time.Sleep(time.Second)
266+
267+
// act
268+
err = sut.Close()
269+
270+
// assert
271+
Expect(err).To(BeNil())
272+
})
246273
})
247274
})
248275

0 commit comments

Comments
 (0)