We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent ee91747 commit 3bfec51Copy full SHA for 3bfec51
buffer.go
@@ -64,13 +64,12 @@ func (buffer *Buffer) Close() error {
64
}
65
66
func (buffer *Buffer) consume() {
67
+ count := 0
68
items := make([]interface{}, buffer.options.Size)
69
+ mustFlush := false
70
ticker, stopTicker := newTicker(buffer.options.FlushInterval)
71
- count := 0
72
isOpen := true
- mustFlush := false
73
-
74
for isOpen {
75
select {
76
case item := <-buffer.dataCh:
@@ -89,7 +88,9 @@ func (buffer *Buffer) consume() {
89
88
if mustFlush {
90
stopTicker()
91
buffer.options.Flusher.Write(items[:count])
+
92
count = 0
93
+ items = make([]interface{}, buffer.options.Size)
94
mustFlush = false
95
ticker, stopTicker = newTicker(buffer.options.FlushInterval)
96
0 commit comments