Skip to content

Commit cabcec5

Browse files
feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2.
Signed-off-by: devendrapohekar-scout <devendra.pohekar@scoutmotors.com>
1 parent 2eddb15 commit cabcec5

File tree

6 files changed

+60
-25
lines changed

6 files changed

+60
-25
lines changed

bindings/aws/kinesis/kinesis.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import (
2727
"github.com/aws/aws-sdk-go/service/kinesis"
2828
"github.com/cenkalti/backoff/v4"
2929
"github.com/google/uuid"
30-
"github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
31-
"github.com/vmware/vmware-go-kcl/clientlibrary/worker"
30+
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
31+
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"
3232

3333
"github.com/dapr/components-contrib/bindings"
3434
awsAuth "github.com/dapr/components-contrib/common/authentication/aws"
@@ -68,6 +68,7 @@ type kinesisMetadata struct {
6868
SecretKey string `json:"secretKey" mapstructure:"secretKey"`
6969
SessionToken string `json:"sessionToken" mapstructure:"sessionToken"`
7070
KinesisConsumerMode string `json:"mode" mapstructure:"mode"`
71+
ApplicationName string `json:"applicationName" mapstructure:"applicationName"`
7172
}
7273

7374
const (
@@ -119,6 +120,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error
119120
a.consumerMode = m.KinesisConsumerMode
120121
a.streamName = m.StreamName
121122
a.consumerName = m.ConsumerName
123+
a.applicationName = m.ApplicationName
122124
a.metadata = m
123125

124126
opts := awsAuth.Options{
@@ -161,19 +163,21 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
161163
return errors.New("binding is closed")
162164
}
163165

164-
if a.metadata.KinesisConsumerMode == SharedThroughput {
166+
switch a.metadata.KinesisConsumerMode {
167+
case SharedThroughput:
168+
// initalize worker configuration
169+
config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName)
165170
// Configure the KCL worker with custom endpoints for LocalStack
166-
config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode)
167171
if a.metadata.Endpoint != "" {
168-
config.KinesisEndpoint = a.metadata.Endpoint
169-
config.DynamoDBEndpoint = a.metadata.Endpoint
172+
config.WithKinesisEndpoint(a.metadata.Endpoint)
173+
config.WithDynamoDBEndpoint(a.metadata.Endpoint)
170174
}
171175
a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config)
172176
err = a.worker.Start()
173177
if err != nil {
174178
return err
175179
}
176-
} else if a.metadata.KinesisConsumerMode == ExtendedFanout {
180+
case ExtendedFanout:
177181
var stream *kinesis.DescribeStreamOutput
178182
stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName})
179183
if err != nil {
@@ -197,9 +201,10 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
197201
case <-ctx.Done():
198202
case <-a.closeCh:
199203
}
200-
if a.metadata.KinesisConsumerMode == SharedThroughput {
204+
switch a.metadata.KinesisConsumerMode {
205+
case SharedThroughput:
201206
a.worker.Shutdown()
202-
} else if a.metadata.KinesisConsumerMode == ExtendedFanout {
207+
case ExtendedFanout:
203208
a.deregisterConsumer(ctx, stream, a.consumerARN)
204209
}
205210
}()

bindings/aws/kinesis/kinesis_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ import (
2525
func TestParseMetadata(t *testing.T) {
2626
m := bindings.Metadata{}
2727
m.Properties = map[string]string{
28-
"accessKey": "key",
29-
"region": "region",
30-
"secretKey": "secret",
31-
"consumerName": "test",
32-
"streamName": "stream",
33-
"mode": "extended",
34-
"endpoint": "endpoint",
35-
"sessionToken": "token",
28+
"accessKey": "key",
29+
"region": "region",
30+
"secretKey": "secret",
31+
"consumerName": "test",
32+
"streamName": "stream",
33+
"mode": "extended",
34+
"endpoint": "endpoint",
35+
"sessionToken": "token",
36+
"applicationName": "applicationName",
3637
}
3738
kinesis := AWSKinesis{}
3839
meta, err := kinesis.parseMetadata(m)
@@ -45,4 +46,5 @@ func TestParseMetadata(t *testing.T) {
4546
assert.Equal(t, "endpoint", meta.Endpoint)
4647
assert.Equal(t, "token", meta.SessionToken)
4748
assert.Equal(t, "extended", meta.KinesisConsumerMode)
49+
assert.Equal(t, "applicationName", meta.ApplicationName)
4850
}

common/authentication/aws/client.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import (
3737
"github.com/aws/aws-sdk-go/service/ssm"
3838
"github.com/aws/aws-sdk-go/service/ssm/ssmiface"
3939
"github.com/aws/aws-sdk-go/service/sts"
40-
"github.com/vmware/vmware-go-kcl/clientlibrary/config"
40+
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
4141
)
4242

4343
type Clients struct {
@@ -182,18 +182,28 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string
182182
stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{
183183
StreamName: aws.String(streamName),
184184
})
185+
<<<<<<< HEAD
185186
if stream != nil {
186187
return stream.StreamDescription.StreamARN, err
188+
=======
189+
/**
190+
* If the error is not nil, do not proceed to the next step
191+
* as it may cause a nil pointer error on stream.StreamDescription.StreamARN.
192+
*/
193+
if err != nil {
194+
return nil, err
195+
>>>>>>> 623adfcb (feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2.)
187196
}
188197
}
189198

190199
return nil, errors.New("unable to get stream arn due to empty client")
191200
}
192201

193-
func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode string) *config.KinesisClientLibConfiguration {
202+
func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration {
194203
const sharedMode = "shared"
195204
if c.Kinesis != nil {
196205
if mode == sharedMode {
206+
<<<<<<< HEAD
197207
<<<<<<< HEAD
198208
if c.Credentials != nil {
199209
kclConfig := config.NewKinesisClientLibConfigWithCredential(consumer,
@@ -219,9 +229,17 @@ func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode s
219229
kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds)
220230
return kclConfig
221231
>>>>>>> 9b9f2b98 (make the v2 creds provider be the default)
232+
=======
233+
v1Creds, err := c.Credentials.Get()
234+
if err != nil {
235+
return nil
236+
}
237+
v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken)
238+
kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds)
239+
return kclConfig
240+
>>>>>>> 623adfcb (feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2.)
222241
}
223242
}
224-
225243
return nil
226244
}
227245

common/authentication/aws/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func TestKinesisClients_WorkerCfg(t *testing.T) {
251251

252252
for _, tt := range tests {
253253
t.Run(tt.name, func(t *testing.T) {
254-
cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.consumer, tt.mode)
254+
cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.kinesisClient.Region, tt.mode, tt.consumer)
255255
if tt.expectedConfig == nil {
256256
assert.Equal(t, tt.expectedConfig, cfg)
257257
return

go.mod

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@ require (
150150
sigs.k8s.io/yaml v1.4.0
151151
)
152152

153+
require (
154+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 // indirect
155+
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 // indirect
156+
)
157+
153158
require (
154159
cel.dev/expr v0.23.0 // indirect
155160
cloud.google.com/go v0.120.0 // indirect
@@ -203,7 +208,6 @@ require (
203208
github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 // indirect
204209
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect
205210
github.com/aws/smithy-go v1.22.5 // indirect
206-
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f // indirect
207211
github.com/benbjohnson/clock v1.3.5 // indirect
208212
github.com/beorn7/perks v1.0.1 // indirect
209213
github.com/bits-and-blooms/bitset v1.4.0 // indirect
@@ -392,6 +396,7 @@ require (
392396
github.com/tklauser/go-sysconf v0.3.12 // indirect
393397
github.com/tklauser/numcpus v0.6.1 // indirect
394398
github.com/valyala/bytebufferpool v1.0.0 // indirect
399+
github.com/vmware/vmware-go-kcl-v2 v1.0.0
395400
github.com/x448/float16 v0.8.4 // indirect
396401
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
397402
github.com/xdg-go/stringprep v1.0.4 // indirect

go.sum

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,12 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W
268268
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
269269
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a h1:QFemvMGPnajaeRBkFc1HoEA7qzVjUv+rkYb1/ps1/UE=
270270
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
271-
github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
272271
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
273272
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
274273
github.com/aws/aws-sdk-go v1.55.6 h1:cSg4pvZ3m8dgYcgqB97MrcdjUmZ1BeMYKUxMMB89IPk=
275274
github.com/aws/aws-sdk-go v1.55.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
276275
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
276+
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
277277
github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
278278
github.com/aws/aws-sdk-go-v2 v1.36.5 h1:0OF9RiEMEdDdZEMqF9MRjevyxAQcf6gY+E7vwBILFj0=
279279
github.com/aws/aws-sdk-go-v2 v1.36.5/go.mod h1:EYrzvCCN9CMUTa5+6lf6MM4tq3Zjp8UhSGR/cBsjai0=
@@ -313,6 +313,9 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.17/go.mod
313313
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72HRZDLMtmVQiLG2tLfQcaWLCssELvGl+Zf2WVxMmR8=
314314
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 h1:t0E6FzREdtCsiLIoLCWsYliNsRBgyGD/MCK571qk4MI=
315315
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17/go.mod h1:ygpklyoaypuyDvOM5ujWGrYWpAK3h7ugnmKCU/76Ys4=
316+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
317+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I=
318+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY=
316319
github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4 h1:BN6+zko+qO9Tl9S0ywUPNvY0gvlFK4Zmj2Y0a8paFkk=
317320
github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4/go.mod h1:hbMVfSdZneCht4UmPOsejDt93QnetQPFuLOOqbuybqs=
318321
github.com/aws/aws-sdk-go-v2/service/sns v1.34.7 h1:OBuZE9Wt8h2imuRktu+WfjiTGrnYdCIJg8IX92aalHE=
@@ -332,8 +335,8 @@ github.com/aws/rolesanywhere-credential-helper v1.0.4/go.mod h1:QVGNxlDlYhjR0/ZU
332335
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
333336
github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw=
334337
github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
335-
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo=
336-
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4=
338+
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
339+
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
337340
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
338341
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
339342
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
@@ -1714,6 +1717,8 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+
17141717
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
17151718
github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk=
17161719
github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo=
1720+
github.com/vmware/vmware-go-kcl-v2 v1.0.0 h1:HPT5vu+khRmGspBSc/+AilEWbRGoTZhjlYqdrBbRMZs=
1721+
github.com/vmware/vmware-go-kcl-v2 v1.0.0/go.mod h1:GBDu+P4Neo0vwZAk0ZUCEC8GYsUOWvi3XhFwAZR3SjA=
17171722
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
17181723
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
17191724
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=

0 commit comments

Comments
 (0)