@@ -23,12 +23,12 @@ package kafka
2323import (
2424 "encoding/json"
2525 "errors"
26+ "fmt"
2627 "time"
2728
2829 "github.com/Shopify/sarama"
2930 "github.com/kelseyhightower/envconfig"
3031 "github.com/kubernetes/helm/pkg/strvals"
31- "github.com/mitchellh/mapstructure"
3232 "gopkg.in/guregu/null.v3"
3333
3434 "go.k6.io/k6/lib/types"
@@ -54,24 +54,6 @@ type Config struct {
5454 InfluxDBConfig influxdbConfig `json:"influxdb"`
5555}
5656
57- // config is a duplicate of ConfigFields as we can not mapstructure.Decode into
58- // null types so we duplicate the struct with primitive types to Decode into
59- type config struct {
60- Brokers []string `json:"brokers" mapstructure:"brokers" envconfig:"K6_KAFKA_BROKERS"`
61- Topic string `json:"topic" mapstructure:"topic" envconfig:"K6_KAFKA_TOPIC"`
62- Format string `json:"format" mapstructure:"format" envconfig:"K6_KAFKA_FORMAT"`
63- PushInterval string `json:"pushInterval" mapstructure:"pushInterval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`
64- User string `json:"user" mapstructure:"user" envconfig:"K6_KAFKA_SASL_USER"`
65- Password string `json:"password" mapstructure:"password" envconfig:"K6_KAFKA_SASL_PASSWORD"`
66- AuthMechanism string `json:"authMechanism" mapstructure:"authMechanism" envconfig:"K6_KAFKA_AUTH_MECHANISM"`
67-
68- InfluxDBConfig influxdbConfig `json:"influxdb" mapstructure:"influxdb"`
69- Version string `json:"version" mapstructure:"version"`
70- SSL bool `json:"ssl" mapstructure:"ssl"`
71- Insecure bool `json:"insecureSkipTLSVerify" mapstructure:"insecure"`
72- LogError bool `json:"logError" mapstructure:"logError"`
73- }
74-
7557// NewConfig creates a new Config instance with default values for some fields.
7658func NewConfig () Config {
7759 return Config {
@@ -135,10 +117,6 @@ func ParseArg(arg string) (Config, error) {
135117 return c , err
136118 }
137119
138- if v , ok := params ["brokers" ].(string ); ok {
139- params ["brokers" ] = []string {v }
140- }
141-
142120 if v , ok := params ["influxdb" ].(map [string ]interface {}); ok {
143121 influxConfig , err := influxdbParseMap (v )
144122 if err != nil {
@@ -154,49 +132,85 @@ func ParseArg(arg string) (Config, error) {
154132 if err != nil {
155133 return c , err
156134 }
135+ delete (params , "pushInterval" )
157136 }
158137
159138 if v , ok := params ["version" ].(string ); ok {
160139 c .Version = null .StringFrom (v )
140+ delete (params , "version" )
161141 }
162142
163143 if v , ok := params ["ssl" ].(bool ); ok {
164144 c .SSL = null .BoolFrom (v )
145+ delete (params , "ssl" )
165146 }
166147
167148 if v , ok := params ["insecureSkipTLSVerify" ].(bool ); ok {
168149 c .InsecureSkipTLSVerify = null .BoolFrom (v )
150+ delete (params , "insecureSkipTLSVerify" )
169151 }
170152
171153 if v , ok := params ["logError" ].(bool ); ok {
172154 c .LogError = null .BoolFrom (v )
155+ delete (params , "logError" )
173156 }
174157
175158 if v , ok := params ["authMechanism" ].(string ); ok {
176159 c .AuthMechanism = null .StringFrom (v )
160+ delete (params , "authMechanism" )
177161 }
178162
179163 if v , ok := params ["user" ].(string ); ok {
180164 c .User = null .StringFrom (v )
165+ delete (params , "user" )
181166 }
182167
183168 if v , ok := params ["password" ].(string ); ok {
184169 c .Password = null .StringFrom (v )
170+ delete (params , "password" )
185171 }
172+ if v , ok := params ["topic" ].(string ); ok {
173+ c .Topic = null .StringFrom (v )
174+ delete (params , "topic" )
175+ }
176+ if v , ok := params ["format" ].(string ); ok {
177+ c .Format = null .StringFrom (v )
186178
187- var cfg config
188- err = mapstructure .Decode (params , & cfg )
189- if err != nil {
190- return c , err
179+ delete (params , "format" )
191180 }
192181
193- c .Brokers = cfg .Brokers
194- c .Topic = null .StringFrom (cfg .Topic )
195- c .Format = null .StringFrom (cfg .Format )
182+ if v , ok := params ["brokers" ].(string ); ok {
183+ c .Brokers = []string {v }
196184
185+ delete (params , "brokers" )
186+ }
187+ if v , ok := params ["brokers" ].([]interface {}); ok {
188+ c .Brokers = interfaceSliceToStringSlice (v )
189+ delete (params , "brokers" )
190+ }
191+
192+ if len (params ) > 0 {
193+ return c , errors .New ("Unknown or unparsed options '" + mapToString (params ) + "'" )
194+ }
197195 return c , nil
198196}
199197
198+ func mapToString (m map [string ]interface {}) string {
199+ var s string
200+ for k , v := range m {
201+ s += fmt .Sprintf ("%s=%v," , k , v )
202+ }
203+ return s [:len (s )- 1 ]
204+ }
205+
206+ func interfaceSliceToStringSlice (input []interface {}) []string {
207+ output := make ([]string , len (input ))
208+ for i , v := range input {
209+ output [i ] = fmt .Sprintf ("%v" , v )
210+ }
211+ return output
212+ }
213+
200214// GetConsolidatedConfig combines {default config values + JSON config +
201215// environment vars + arg config values}, and returns the final result.
202216func GetConsolidatedConfig (jsonRawConf json.RawMessage , env map [string ]string , arg string ) (Config , error ) {
0 commit comments