Skip to content

Commit 3093ea2

Browse files
authored
Merge pull request #6 from globocom/handle-buffer-closed
Prevent panics from happening when the buffer is closed
2 parents 0127683 + 115ecd8 commit 3093ea2

File tree

2 files changed

+87
-12
lines changed

2 files changed

+87
-12
lines changed

buffer.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
var (
1010
// ErrTimeout indicates an operation has timed out.
1111
ErrTimeout = errors.New("operation timed-out")
12+
// ErrClosed indicates the buffer is closed and can no longer be used.
13+
ErrClosed = errors.New("buffer is closed")
1214
)
1315

1416
type (
@@ -23,9 +25,15 @@ type (
2325
}
2426
)
2527

26-
// Push appends an item to the end of the buffer. It times out if it cannot be
27-
// performed in a timely fashion.
28+
// Push appends an item to the end of the buffer.
29+
//
30+
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
31+
// an ErrClosed if the buffer has been closed.
2832
func (buffer *Buffer) Push(item interface{}) error {
33+
if buffer.closed() {
34+
return ErrClosed
35+
}
36+
2937
select {
3038
case buffer.dataCh <- item:
3139
return nil
@@ -34,9 +42,15 @@ func (buffer *Buffer) Push(item interface{}) error {
3442
}
3543
}
3644

37-
// Flush outputs the buffer to a permanent destination. It times out if it cannot be
38-
// performed in a timely fashion.
45+
// Flush outputs the buffer to a permanent destination.
46+
//
47+
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
48+
// an ErrClosed if the buffer has been closed.
3949
func (buffer *Buffer) Flush() error {
50+
if buffer.closed() {
51+
return ErrClosed
52+
}
53+
4054
select {
4155
case buffer.flushCh <- struct{}{}:
4256
return nil
@@ -45,9 +59,19 @@ func (buffer *Buffer) Flush() error {
4559
}
4660
}
4761

48-
// Close flushes the buffer and prevents it from being further used. If it succeeds,
49-
// the buffer cannot be used after it has been closed as all further operations will panic.
62+
// Close flushes the buffer and prevents it from being further used.
63+
//
64+
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
65+
// an ErrClosed if the buffer has already been closed.
66+
//
67+
// An ErrTimeout can either mean that a flush could not be triggered, or it can
68+
// mean that a flush was triggered but it has not finished yet. In any case it is
69+
// safe to call Close again.
5070
func (buffer *Buffer) Close() error {
71+
if buffer.closed() {
72+
return ErrClosed
73+
}
74+
5175
select {
5276
case buffer.closeCh <- struct{}{}:
5377
// noop
@@ -66,6 +90,15 @@ func (buffer *Buffer) Close() error {
6690
}
6791
}
6892

93+
func (buffer Buffer) closed() bool {
94+
select {
95+
case <-buffer.doneCh:
96+
return true
97+
default:
98+
return false
99+
}
100+
}
101+
69102
func (buffer *Buffer) consume() {
70103
count := 0
71104
items := make([]interface{}, buffer.options.Size)

buffer_test.go

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,21 @@ var _ = Describe("Buffer", func() {
126126
Expect(err2).To(Succeed())
127127
Expect(err3).To(MatchError(buffer.ErrTimeout))
128128
})
129+
130+
It("fails when the buffer is closed", func() {
131+
// arrange
132+
sut := buffer.New(
133+
buffer.WithSize(2),
134+
buffer.WithFlusher(flusher),
135+
)
136+
_ = sut.Close()
137+
138+
// act
139+
err := sut.Push(1)
140+
141+
// assert
142+
Expect(err).To(MatchError(buffer.ErrClosed))
143+
})
129144
})
130145

131146
Context("Flushing", func() {
@@ -204,6 +219,21 @@ var _ = Describe("Buffer", func() {
204219
// assert
205220
Expect(err).To(MatchError(buffer.ErrTimeout))
206221
})
222+
223+
It("fails when the buffer is closed", func() {
224+
// arrange
225+
sut := buffer.New(
226+
buffer.WithSize(2),
227+
buffer.WithFlusher(flusher),
228+
)
229+
_ = sut.Close()
230+
231+
// act
232+
err := sut.Flush()
233+
234+
// assert
235+
Expect(err).To(MatchError(buffer.ErrClosed))
236+
})
207237
})
208238

209239
Context("Closing", func() {
@@ -244,7 +274,7 @@ var _ = Describe("Buffer", func() {
244274
Expect(err).To(MatchError(buffer.ErrTimeout))
245275
})
246276

247-
It("allow Close to be called again if it fails", func() {
277+
It("fails when the buffer is closed", func() {
248278
// arrange
249279
flusher.Func = func() { time.Sleep(2 * time.Second) }
250280

@@ -253,22 +283,34 @@ var _ = Describe("Buffer", func() {
253283
buffer.WithFlusher(flusher),
254284
buffer.WithCloseTimeout(time.Second),
255285
)
256-
_ = sut.Push(1)
286+
_ = sut.Close()
257287

258288
// act
259289
err := sut.Close()
260290

261291
// assert
262-
Expect(err).To(MatchError(buffer.ErrTimeout))
292+
Expect(err).To(MatchError(buffer.ErrClosed))
293+
})
263294

295+
It("allows Close to be called again if it fails", func() {
264296
// arrange
265-
time.Sleep(time.Second)
297+
flusher.Func = func() { time.Sleep(2 * time.Second) }
298+
299+
sut := buffer.New(
300+
buffer.WithSize(1),
301+
buffer.WithFlusher(flusher),
302+
buffer.WithCloseTimeout(time.Second),
303+
)
304+
_ = sut.Push(1)
266305

267306
// act
268-
err = sut.Close()
307+
err1 := sut.Close()
308+
time.Sleep(time.Second)
309+
err2 := sut.Close()
269310

270311
// assert
271-
Expect(err).To(BeNil())
312+
Expect(err1).To(MatchError(buffer.ErrTimeout))
313+
Expect(err2).To(Succeed())
272314
})
273315
})
274316
})

0 commit comments

Comments
 (0)