diff --git a/go.mod b/go.mod index 6b4c053..3466d7d 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,9 @@ require ( github.com/redis/go-redis/v9 v9.5.3 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect diff --git a/kq/config.go b/kq/config.go index 6429dd8..f0b0187 100644 --- a/kq/config.go +++ b/kq/config.go @@ -23,4 +23,5 @@ type KqConf struct { Password string `json:",optional"` ForceCommit bool `json:",default=true"` CommitInOrder bool `json:",default=false"` + Mechanism string `json:",options=plain|scram-sha-256|scram-sha-512,default=plain"` } diff --git a/kq/queue.go b/kq/queue.go index b7f23ac..9662113 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -13,7 +13,9 @@ import ( "github.com/segmentio/kafka-go" _ "github.com/segmentio/kafka-go/gzip" _ "github.com/segmentio/kafka-go/lz4" + "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" _ "github.com/segmentio/kafka-go/snappy" "github.com/zeromicro/go-queue/kq/internal" "github.com/zeromicro/go-zero/core/contextx" @@ -102,14 +104,37 @@ func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.Mess q := kafkaQueues{ group: service.NewServiceGroup(), } + + var mechanism sasl.Mechanism + var err error + switch c.Mechanism { + case "plain": + mechanism = plain.Mechanism{ + Username: c.Username, + Password: c.Password, + } + case "scram-sha-256": + mechanism, err = scram.Mechanism(scram.SHA256, c.Username, c.Password) + if err != nil { + return nil, err + } + case "scram-sha-512": + mechanism, err = scram.Mechanism(scram.SHA512, c.Username, c.Password) + if err != nil { + return nil, err + } + default: + return nil, errors.New("unsupported mechanism: " + c.Mechanism) + } + for i := 0; i < c.Conns; i++ { - q.queues = append(q.queues, newKafkaQueue(c, handler, options)) + q.queues = append(q.queues, newKafkaQueue(c, handler, options, mechanism)) } return q, nil } -func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue { +func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions, mechanism sasl.Mechanism) queue.MessageQueue { var offset int64 if c.Offset == firstOffset { offset = kafka.FirstOffset @@ -130,10 +155,7 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue } if len(c.Username) > 0 && len(c.Password) > 0 { readerConfig.Dialer = &kafka.Dialer{ - SASLMechanism: plain.Mechanism{ - Username: c.Username, - Password: c.Password, - }, + SASLMechanism: mechanism, } } if len(c.CaFile) > 0 {