Skip to content

Commit 2348707

Browse files
committed
add concurrent_write for kafka-client
1 parent aa8ebfc commit 2348707

File tree

5 files changed

+86
-6
lines changed

5 files changed

+86
-6
lines changed

kafka-clients/zapkafka/Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
all:
2+
go build github.com/bigwhite/zapkafka/cmd/concurrent_write
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
10+
log "github.com/bigwhite/zapkafka"
11+
"go.uber.org/zap"
12+
"go.uber.org/zap/zapcore"
13+
)
14+
15+
func SaramaProducer() {
16+
p, err := log.NewKafkaAsyncProducer([]string{"localhost:29092"})
17+
if err != nil {
18+
panic(err)
19+
}
20+
logger := log.New(log.NewKafkaSyncer(p, "test", zapcore.AddSync(os.Stderr)), int8(0))
21+
var wg sync.WaitGroup
22+
var cnt int64
23+
24+
for j := 0; j < 10; j++ {
25+
wg.Add(1)
26+
go func(j int) {
27+
var value string
28+
for i := 0; i < 10000; i++ {
29+
now := time.Now()
30+
value = fmt.Sprintf("%02d-%04d-%s", j, i, now.Format("15:04:05"))
31+
logger.Info("log message:", zap.String("value", value))
32+
atomic.AddInt64(&cnt, 1)
33+
}
34+
wg.Done()
35+
}(j)
36+
}
37+
38+
wg.Wait()
39+
logger.Sync()
40+
println("cnt =", atomic.LoadInt64(&cnt))
41+
time.Sleep(10 * time.Second)
42+
}
43+
44+
func main() {
45+
SaramaProducer()
46+
}

kafka-clients/zapkafka/kafka_syncer.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,16 @@ func (ws *kafkaWriteSyncer) Write(b []byte) (n int, err error) {
4444
Topic: ws.topic,
4545
Value: sarama.ByteEncoder(b1),
4646
}
47+
ws.producer.Input() <- msg
4748

48-
select {
49-
case ws.producer.Input() <- msg:
50-
default:
51-
// if producer block on input channel, write log entry to default fallbackSyncer
52-
return ws.fallbackSyncer.Write(b1)
53-
}
49+
/*
50+
select {
51+
case ws.producer.Input() <- msg:
52+
default:
53+
// if producer block on input channel, write log entry to default fallbackSyncer
54+
return ws.fallbackSyncer.Write(b1)
55+
}
56+
*/
5457

5558
return len(b1), nil
5659
}

kafka-clients/zapkafka/log.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ func (l *Logger) Info(msg string, fields ...zap.Field) {
1818
l.l.Info(msg, fields...)
1919
}
2020

21+
func (l *Logger) Sync() {
22+
l.l.Sync()
23+
}
24+
2125
func New(writer io.Writer, level int8, opts ...zap.Option) *Logger {
2226
if writer == nil {
2327
panic("the writer is nil")

kafka-clients/zapkafka/log_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.uber.org/zap"
1414
)
1515

16+
/*
1617
func TestWriteFailWithKafkaSyncer(t *testing.T) {
1718
config := sarama.NewConfig()
1819
config.Producer.Return.Successes = true
@@ -46,6 +47,30 @@ func TestWriteFailWithKafkaSyncer(t *testing.T) {
4647
t.Errorf("want true, actual false")
4748
}
4849
}
50+
*/
51+
52+
func TestWriteFailWithKafkaSyncer(t *testing.T) {
53+
config := sarama.NewConfig()
54+
p := mocks.NewAsyncProducer(t, config)
55+
56+
var buf = []byte{}
57+
w := bytes.NewBuffer(buf)
58+
logger := New(NewKafkaSyncer(p, "test", NewFileSyncer(w)), 0)
59+
60+
// all below will be written to the fallback sycner
61+
logger.Info("demo1", zap.String("status", "ok")) // write to the kafka syncer
62+
63+
p.ExpectInputAndFail(errors.New("produce error"))
64+
65+
s := string(w.Bytes())
66+
if !strings.Contains(s, "demo1") {
67+
t.Errorf("want true, actual false")
68+
}
69+
70+
if err := p.Close(); err != nil {
71+
t.Error(err)
72+
}
73+
}
4974

5075
func TestWriteOKWithKafkaSyncer(t *testing.T) {
5176
config := sarama.NewConfig()

0 commit comments

Comments
 (0)