Skip to content

Commit c771569

Browse files
committed
add confluent-kafka-go-static-build
1 parent 51ee39f commit c771569

File tree

4 files changed

+116
-0
lines changed

4 files changed

+116
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module producer
2+
3+
go 1.18
4+
5+
require github.com/confluentinc/confluent-kafka-go v1.8.2
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E=
2+
github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
bootstrap.servers=localhost:29092
2+
security.protocol=PLAINTEXT
3+
sasl.mechanisms=PLAIN
4+
sasl.username=
5+
sasl.password=
6+
7+
# Best practice for Kafka producer to prevent data loss
8+
acks=all
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"os"
7+
"strings"
8+
"sync"
9+
"sync/atomic"
10+
"time"
11+
12+
"github.com/confluentinc/confluent-kafka-go/kafka"
13+
)
14+
15+
func ReadConfig(configFile string) kafka.ConfigMap {
16+
17+
m := make(map[string]kafka.ConfigValue)
18+
19+
file, err := os.Open(configFile)
20+
if err != nil {
21+
fmt.Fprintf(os.Stderr, "Failed to open file: %s", err)
22+
os.Exit(1)
23+
}
24+
defer file.Close()
25+
26+
scanner := bufio.NewScanner(file)
27+
for scanner.Scan() {
28+
line := strings.TrimSpace(scanner.Text())
29+
if !strings.HasPrefix(line, "#") && len(line) != 0 {
30+
kv := strings.Split(line, "=")
31+
parameter := strings.TrimSpace(kv[0])
32+
value := strings.TrimSpace(kv[1])
33+
m[parameter] = value
34+
}
35+
}
36+
37+
if err := scanner.Err(); err != nil {
38+
fmt.Printf("Failed to read file: %s", err)
39+
os.Exit(1)
40+
}
41+
42+
return m
43+
44+
}
45+
46+
func main() {
47+
conf := ReadConfig("./producer.conf")
48+
49+
topic := "test"
50+
p, err := kafka.NewProducer(&conf)
51+
var mu sync.Mutex
52+
53+
if err != nil {
54+
fmt.Printf("Failed to create producer: %s", err)
55+
os.Exit(1)
56+
}
57+
var wg sync.WaitGroup
58+
var cnt int64
59+
60+
// Go-routine to handle message delivery reports and
61+
// possibly other event types (errors, stats, etc)
62+
go func() {
63+
for e := range p.Events() {
64+
switch ev := e.(type) {
65+
case *kafka.Message:
66+
if ev.TopicPartition.Error != nil {
67+
fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
68+
} else {
69+
fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
70+
*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
71+
}
72+
}
73+
}
74+
}()
75+
76+
for j := 0; j < 10; j++ {
77+
wg.Add(1)
78+
go func(j int) {
79+
var value string
80+
for i := 0; i < 10000; i++ {
81+
key := ""
82+
now := time.Now()
83+
value = fmt.Sprintf("%02d-%04d-%s", j, i, now.Format("15:04:05"))
84+
mu.Lock()
85+
p.Produce(&kafka.Message{
86+
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
87+
Key: []byte(key),
88+
Value: []byte(value),
89+
}, nil)
90+
mu.Unlock()
91+
atomic.AddInt64(&cnt, 1)
92+
}
93+
wg.Done()
94+
}(j)
95+
}
96+
97+
wg.Wait()
98+
// Wait for all messages to be delivered
99+
time.Sleep(10 * time.Second)
100+
p.Close()
101+
}

0 commit comments

Comments
 (0)