Skip to content

Commit 51ee39f

Browse files
committed
update test of zapkafka
1 parent 2348707 commit 51ee39f

File tree

2 files changed

+14
-39
lines changed

2 files changed

+14
-39
lines changed

kafka-clients/zapkafka/file_syncer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ func NewFileSyncer(writer io.Writer) zapcore.WriteSyncer {
1010
if ws, ok := writer.(zapcore.WriteSyncer); ok {
1111
return ws
1212
}
13-
return zapcore.AddSync(writer)
13+
return zapcore.Lock(zapcore.AddSync(writer))
1414
}

kafka-clients/zapkafka/log_test.go

Lines changed: 13 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,65 +7,39 @@ import (
77
"fmt"
88
"strings"
99
"testing"
10+
"time"
1011

1112
"github.com/Shopify/sarama"
1213
"github.com/Shopify/sarama/mocks"
1314
"go.uber.org/zap"
1415
)
1516

16-
/*
1717
func TestWriteFailWithKafkaSyncer(t *testing.T) {
1818
config := sarama.NewConfig()
19-
config.Producer.Return.Successes = true
20-
config.ChannelBufferSize = 0 // make sure the success channel is unbuffered to block the kafka syncer
2119
p := mocks.NewAsyncProducer(t, config)
2220

23-
var buf = []byte{}
21+
var buf = make([]byte, 0, 256)
2422
w := bytes.NewBuffer(buf)
23+
w.Write([]byte("hello"))
2524
logger := New(NewKafkaSyncer(p, "test", NewFileSyncer(w)), 0)
2625

27-
logger.Info("demo1", zap.String("status", "ok")) // write to the kafka syncer
28-
p.ExpectInputAndSucceed()
29-
30-
// all below will be written to the fallback sycner
31-
logger.Info("demo2", zap.String("status", "ok"))
32-
logger.Info("demo3", zap.String("status", "ok"))
33-
logger.Info("demo4", zap.String("status", "ok"))
34-
logger.Info("demo5", zap.String("status", "ok"))
35-
36-
s := string(w.Bytes())
37-
if !strings.Contains(s, "demo2") {
38-
t.Errorf("want true, actual false")
39-
}
40-
if !strings.Contains(s, "demo3") {
41-
t.Errorf("want true, actual false")
42-
}
43-
if !strings.Contains(s, "demo4") {
44-
t.Errorf("want true, actual false")
45-
}
46-
if !strings.Contains(s, "demo5") {
47-
t.Errorf("want true, actual false")
48-
}
49-
}
50-
*/
51-
52-
func TestWriteFailWithKafkaSyncer(t *testing.T) {
53-
config := sarama.NewConfig()
54-
p := mocks.NewAsyncProducer(t, config)
55-
56-
var buf = []byte{}
57-
w := bytes.NewBuffer(buf)
58-
logger := New(NewKafkaSyncer(p, "test", NewFileSyncer(w)), 0)
26+
p.ExpectInputAndFail(errors.New("produce error"))
27+
p.ExpectInputAndFail(errors.New("produce error"))
5928

6029
// all below will be written to the fallback sycner
6130
logger.Info("demo1", zap.String("status", "ok")) // write to the kafka syncer
31+
logger.Info("demo2", zap.String("status", "ok")) // write to the kafka syncer
6232

63-
p.ExpectInputAndFail(errors.New("produce error"))
33+
// make sure the goroutine which handles the error writes the log to the fallback syncer
34+
time.Sleep(2 * time.Second)
6435

6536
s := string(w.Bytes())
6637
if !strings.Contains(s, "demo1") {
6738
t.Errorf("want true, actual false")
6839
}
40+
if !strings.Contains(s, "demo2") {
41+
t.Errorf("want true, actual false")
42+
}
6943

7044
if err := p.Close(); err != nil {
7145
t.Error(err)
@@ -80,7 +54,6 @@ func TestWriteOKWithKafkaSyncer(t *testing.T) {
8054
w := bytes.NewBuffer(buf)
8155

8256
logger := New(NewKafkaSyncer(p, "test", NewFileSyncer(w)), 0)
83-
logger.Info("demo1", zap.String("status", "ok"))
8457

8558
messageChecker := func(msg *sarama.ProducerMessage) error {
8659
b, err := msg.Value.Encode()
@@ -126,6 +99,8 @@ func TestWriteOKWithKafkaSyncer(t *testing.T) {
12699
}
127100

128101
p.ExpectInputWithMessageCheckerFunctionAndSucceed(mocks.MessageChecker(messageChecker))
102+
logger.Info("demo1", zap.String("status", "ok"))
103+
129104
if err := p.Close(); err != nil {
130105
t.Error(err)
131106
}

0 commit comments

Comments
 (0)