Skip to content

Commit fd2aca8

Browse files
authored
Merge pull request #7 from globocom/generics-support
2 parents 9df2015 + 99a0104 commit fd2aca8

File tree

11 files changed

+241
-141
lines changed

11 files changed

+241
-141
lines changed

.github/workflows/go.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
- name: Set up Go 1.x
1717
uses: actions/setup-go@v2
1818
with:
19-
go-version: ^1.14
19+
go-version: 1.24
2020
id: go
2121

2222
- name: Check out code into the Go module directory

.tool-versions

Lines changed: 0 additions & 1 deletion
This file was deleted.

README.md

Lines changed: 127 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,140 @@
1414

1515
# go-buffer
1616

17-
`go-buffer` represents a buffer that asynchronously flushes its contents. It is useful for applications that need to aggregate data before writing it to an external storage. A buffer is flushed manually, or automatically when it becomes full or after an interval has elapsed, whichever comes first.
17+
`go-buffer` represents a buffer that asynchronously flushes its contents. It is useful for applications that need to
18+
aggregate data before writing it to an external storage. A buffer is flushed manually, or automatically when it becomes
19+
full or after an interval has elapsed, whichever comes first.
1820

1921
## Installation
2022

2123
go get github.com/globocom/go-buffer
2224

25+
Go < 1.18:
26+
27+
go get github.com/globocom/go-buffer@v2
28+
2329
## Examples
2430

31+
> [!NOTE]
32+
> For v2, see [Examples v2](#examples-v2).
33+
34+
### Size-triggered flush
35+
36+
```go
37+
package main
38+
39+
import (
40+
"time"
41+
42+
"github.com/globocom/go-buffer/v3"
43+
)
44+
45+
func main() {
46+
buff := buffer.New(
47+
// call this function when the buffer needs flushing
48+
func(items []string) {
49+
for _, item := range items {
50+
println(string)
51+
}
52+
},
53+
// buffer can hold up to 5 items
54+
buffer.WithSize(5),
55+
)
56+
// ensure the buffer
57+
defer buff.Close()
58+
59+
buff.Push("item 1")
60+
buff.Push("item 2")
61+
buff.Push("item 3")
62+
buff.Push("item 4")
63+
buff.Push("item 5")
64+
65+
// block the current goroutine
66+
time.Sleep(3 * time.Second)
67+
68+
println("done")
69+
}
70+
```
71+
72+
### Interval-triggered flush
73+
74+
```go
75+
package main
76+
77+
import (
78+
"time"
79+
80+
"github.com/globocom/go-buffer/v3"
81+
)
82+
83+
func main() {
84+
buff := buffer.New(
85+
// call this function when the buffer needs flushing
86+
func(items []string) {
87+
for _, item := range items {
88+
println(item)
89+
}
90+
},
91+
// buffer can hold up to 5 items
92+
buffer.WithSize(5),
93+
// buffer will be flushed every second, regardless of
94+
// how many items were pushed
95+
buffer.WithFlushInterval(time.Second),
96+
)
97+
defer buff.Close()
98+
99+
buff.Push("item 1")
100+
buff.Push("item 2")
101+
buff.Push("item 3")
102+
103+
// block the current goroutine
104+
time.Sleep(3 * time.Second)
105+
106+
println("done")
107+
}
108+
```
109+
110+
### Manual flush
111+
112+
```go
113+
package main
114+
115+
import (
116+
"time"
117+
118+
"github.com/globocom/go-buffer/v3"
119+
)
120+
121+
func main() {
122+
buff := buffer.New(
123+
// call this function when the buffer needs flushing
124+
func(items []string) {
125+
for _, item := range items {
126+
println(item)
127+
}
128+
},
129+
// buffer can hold up to 5 items
130+
buffer.WithSize(5),
131+
)
132+
defer buff.Close()
133+
134+
buff.Push("item 1")
135+
buff.Push("item 2")
136+
buff.Push("item 3")
137+
138+
// block the current goroutine
139+
time.Sleep(3*time.Second)
140+
141+
buff.Flush()
142+
println("done")
143+
}
144+
```
145+
146+
## Examples v2
147+
25148
### Size-triggered flush
26149

27-
```golang
150+
```go
28151
package main
29152

30153
import (
@@ -62,7 +185,7 @@ func main() {
62185

63186
### Interval-triggered flush
64187

65-
```golang
188+
```go
66189
package main
67190

68191
import (
@@ -100,7 +223,7 @@ func main() {
100223

101224
### Manual flush
102225

103-
```golang
226+
```go
104227
package main
105228

106229
import (

bench_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,21 @@ package buffer_test
33
import (
44
"testing"
55

6-
"github.com/globocom/go-buffer/v2"
6+
"github.com/globocom/go-buffer/v3"
77
)
88

99
func BenchmarkBuffer(b *testing.B) {
10-
noop := buffer.FlusherFunc(func([]interface{}) {})
10+
noop := func([]int) {}
1111

1212
b.Run("push only", func(b *testing.B) {
1313
sut := buffer.New(
14+
noop,
1415
buffer.WithSize(uint(b.N)+1),
15-
buffer.WithFlusher(noop),
1616
)
1717
defer sut.Close()
1818

19-
for i := 0; i < b.N; i++ {
20-
err := sut.Push(i)
19+
for b.Loop() {
20+
err := sut.Push(1)
2121
if err != nil {
2222
b.Fail()
2323
}
@@ -26,13 +26,13 @@ func BenchmarkBuffer(b *testing.B) {
2626

2727
b.Run("push and flush", func(b *testing.B) {
2828
sut := buffer.New(
29+
noop,
2930
buffer.WithSize(1),
30-
buffer.WithFlusher(noop),
3131
)
3232
defer sut.Close()
3333

34-
for i := 0; i < b.N; i++ {
35-
err := sut.Push(i)
34+
for b.Loop() {
35+
err := sut.Push(1)
3636
if err != nil {
3737
b.Fail()
3838
}

buffer.go

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,42 @@ var (
1515

1616
type (
1717
// Buffer represents a data buffer that is asynchronously flushed, either manually or automatically.
18-
Buffer struct {
18+
Buffer[T any] struct {
1919
io.Closer
20-
dataCh chan interface{}
21-
flushCh chan struct{}
22-
closeCh chan struct{}
23-
doneCh chan struct{}
24-
options *Options
20+
flushFunc func([]T)
21+
dataCh chan T
22+
flushCh chan struct{}
23+
closeCh chan struct{}
24+
doneCh chan struct{}
25+
options *Options
2526
}
2627
)
2728

29+
// New creates a new buffer instance with the provided flush function and options.
30+
// It panics if provided with a nil flush function.
31+
func New[T any](flushFunc func([]T), opts ...Option) *Buffer[T] {
32+
if flushFunc == nil {
33+
panic("flush function cannot be nil")
34+
}
35+
36+
buffer := &Buffer[T]{
37+
flushFunc: flushFunc,
38+
dataCh: make(chan T),
39+
flushCh: make(chan struct{}),
40+
closeCh: make(chan struct{}),
41+
doneCh: make(chan struct{}),
42+
options: resolveOptions(opts...),
43+
}
44+
go buffer.consume()
45+
46+
return buffer
47+
}
48+
2849
// Push appends an item to the end of the buffer.
2950
//
30-
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
51+
// It returns an ErrTimeout if it cannot be performed in a timely fashion, and
3152
// an ErrClosed if the buffer has been closed.
32-
func (buffer *Buffer) Push(item interface{}) error {
53+
func (buffer *Buffer[T]) Push(item T) error {
3354
if buffer.closed() {
3455
return ErrClosed
3556
}
@@ -46,7 +67,7 @@ func (buffer *Buffer) Push(item interface{}) error {
4667
//
4768
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
4869
// an ErrClosed if the buffer has been closed.
49-
func (buffer *Buffer) Flush() error {
70+
func (buffer *Buffer[T]) Flush() error {
5071
if buffer.closed() {
5172
return ErrClosed
5273
}
@@ -67,7 +88,7 @@ func (buffer *Buffer) Flush() error {
6788
// An ErrTimeout can either mean that a flush could not be triggered, or it can
6889
// mean that a flush was triggered but it has not finished yet. In any case it is
6990
// safe to call Close again.
70-
func (buffer *Buffer) Close() error {
91+
func (buffer *Buffer[T]) Close() error {
7192
if buffer.closed() {
7293
return ErrClosed
7394
}
@@ -90,7 +111,7 @@ func (buffer *Buffer) Close() error {
90111
}
91112
}
92113

93-
func (buffer Buffer) closed() bool {
114+
func (buffer *Buffer[T]) closed() bool {
94115
select {
95116
case <-buffer.doneCh:
96117
return true
@@ -99,9 +120,9 @@ func (buffer Buffer) closed() bool {
99120
}
100121
}
101122

102-
func (buffer *Buffer) consume() {
123+
func (buffer *Buffer[T]) consume() {
103124
count := 0
104-
items := make([]interface{}, buffer.options.Size)
125+
items := make([]T, buffer.options.Size)
105126
mustFlush := false
106127
ticker, stopTicker := newTicker(buffer.options.FlushInterval)
107128

@@ -123,10 +144,10 @@ func (buffer *Buffer) consume() {
123144

124145
if mustFlush {
125146
stopTicker()
126-
buffer.options.Flusher.Write(items[:count])
147+
buffer.flushFunc(items[:count])
127148

128149
count = 0
129-
items = make([]interface{}, buffer.options.Size)
150+
items = make([]T, buffer.options.Size)
130151
mustFlush = false
131152
ticker, stopTicker = newTicker(buffer.options.FlushInterval)
132153
}
@@ -144,17 +165,3 @@ func newTicker(interval time.Duration) (<-chan time.Time, func()) {
144165
ticker := time.NewTicker(interval)
145166
return ticker.C, ticker.Stop
146167
}
147-
148-
// New creates a new buffer instance with the provided options.
149-
func New(opts ...Option) *Buffer {
150-
buffer := &Buffer{
151-
dataCh: make(chan interface{}),
152-
flushCh: make(chan struct{}),
153-
closeCh: make(chan struct{}),
154-
doneCh: make(chan struct{}),
155-
options: resolveOptions(opts...),
156-
}
157-
go buffer.consume()
158-
159-
return buffer
160-
}

0 commit comments

Comments
 (0)