Skip to content

Commit 623adfc

Browse files
feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2.
Signed-off-by: devendrapohekar-scout <devendra.pohekar@scoutmotors.com>
1 parent 77df752 commit 623adfc

File tree

6 files changed

+55
-35
lines changed

6 files changed

+55
-35
lines changed

bindings/aws/kinesis/kinesis.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import (
2727
"github.com/aws/aws-sdk-go/service/kinesis"
2828
"github.com/cenkalti/backoff/v4"
2929
"github.com/google/uuid"
30-
"github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
31-
"github.com/vmware/vmware-go-kcl/clientlibrary/worker"
30+
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
31+
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"
3232

3333
"github.com/dapr/components-contrib/bindings"
3434
awsAuth "github.com/dapr/components-contrib/common/authentication/aws"
@@ -50,9 +50,10 @@ type AWSKinesis struct {
5050
logger logger.Logger
5151
consumerMode string
5252

53-
closed atomic.Bool
54-
closeCh chan struct{}
55-
wg sync.WaitGroup
53+
closed atomic.Bool
54+
closeCh chan struct{}
55+
wg sync.WaitGroup
56+
applicationName string
5657
}
5758

5859
// TODO: we need to clean up the metadata fields here and update this binding to use the builtin aws auth provider and reflect in metadata.yaml
@@ -65,6 +66,7 @@ type kinesisMetadata struct {
6566
SecretKey string `json:"secretKey" mapstructure:"secretKey"`
6667
SessionToken string `json:"sessionToken" mapstructure:"sessionToken"`
6768
KinesisConsumerMode string `json:"mode" mapstructure:"mode"`
69+
ApplicationName string `json:"applicationName" mapstructure:"applicationName"`
6870
}
6971

7072
const (
@@ -116,6 +118,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error
116118
a.consumerMode = m.KinesisConsumerMode
117119
a.streamName = m.StreamName
118120
a.consumerName = m.ConsumerName
121+
a.applicationName = m.ApplicationName
119122
a.metadata = m
120123

121124
opts := awsAuth.Options{
@@ -158,19 +161,21 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
158161
return errors.New("binding is closed")
159162
}
160163

161-
if a.metadata.KinesisConsumerMode == SharedThroughput {
164+
switch a.metadata.KinesisConsumerMode {
165+
case SharedThroughput:
166+
// initalize worker configuration
167+
config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName)
162168
// Configure the KCL worker with custom endpoints for LocalStack
163-
config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode)
164169
if a.metadata.Endpoint != "" {
165-
config.KinesisEndpoint = a.metadata.Endpoint
166-
config.DynamoDBEndpoint = a.metadata.Endpoint
170+
config.WithKinesisEndpoint(a.metadata.Endpoint)
171+
config.WithDynamoDBEndpoint(a.metadata.Endpoint)
167172
}
168173
a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config)
169174
err = a.worker.Start()
170175
if err != nil {
171176
return err
172177
}
173-
} else if a.metadata.KinesisConsumerMode == ExtendedFanout {
178+
case ExtendedFanout:
174179
var stream *kinesis.DescribeStreamOutput
175180
stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName})
176181
if err != nil {
@@ -202,9 +207,10 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
202207
case <-ctx.Done():
203208
case <-a.closeCh:
204209
}
205-
if a.metadata.KinesisConsumerMode == SharedThroughput {
210+
switch a.metadata.KinesisConsumerMode {
211+
case SharedThroughput:
206212
a.worker.Shutdown()
207-
} else if a.metadata.KinesisConsumerMode == ExtendedFanout {
213+
case ExtendedFanout:
208214
a.deregisterConsumer(ctx, stream, a.consumerARN)
209215
}
210216
}()

bindings/aws/kinesis/kinesis_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ import (
2525
func TestParseMetadata(t *testing.T) {
2626
m := bindings.Metadata{}
2727
m.Properties = map[string]string{
28-
"accessKey": "key",
29-
"region": "region",
30-
"secretKey": "secret",
31-
"consumerName": "test",
32-
"streamName": "stream",
33-
"mode": "extended",
34-
"endpoint": "endpoint",
35-
"sessionToken": "token",
28+
"accessKey": "key",
29+
"region": "region",
30+
"secretKey": "secret",
31+
"consumerName": "test",
32+
"streamName": "stream",
33+
"mode": "extended",
34+
"endpoint": "endpoint",
35+
"sessionToken": "token",
36+
"applicationName": "applicationName",
3637
}
3738
kinesis := AWSKinesis{}
3839
meta, err := kinesis.parseMetadata(m)
@@ -45,4 +46,5 @@ func TestParseMetadata(t *testing.T) {
4546
assert.Equal(t, "endpoint", meta.Endpoint)
4647
assert.Equal(t, "token", meta.SessionToken)
4748
assert.Equal(t, "extended", meta.KinesisConsumerMode)
49+
assert.Equal(t, "applicationName", meta.ApplicationName)
4850
}

common/authentication/aws/client.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ import (
1818
"errors"
1919
"sync"
2020

21-
"github.com/aws/aws-sdk-go/aws"
21+
"github.com/aws/aws-sdk-go-v2/aws"
22+
v2creds "github.com/aws/aws-sdk-go-v2/credentials"
2223
"github.com/aws/aws-sdk-go/aws/credentials"
2324
"github.com/aws/aws-sdk-go/aws/session"
2425
"github.com/aws/aws-sdk-go/service/dynamodb"
@@ -36,7 +37,7 @@ import (
3637
"github.com/aws/aws-sdk-go/service/ssm"
3738
"github.com/aws/aws-sdk-go/service/ssm/ssmiface"
3839
"github.com/aws/aws-sdk-go/service/sts"
39-
"github.com/vmware/vmware-go-kcl/clientlibrary/config"
40+
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
4041
)
4142

4243
type Clients struct {
@@ -184,7 +185,8 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string
184185
/**
185186
* If the error is not nil, do not proceed to the next step
186187
* as it may cause a nil pointer error on stream.StreamDescription.StreamARN.
187-
*/ if err != nil {
188+
*/
189+
if err != nil {
188190
return nil, err
189191
}
190192
return stream.StreamDescription.StreamARN, err
@@ -193,19 +195,19 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string
193195
return nil, errors.New("unable to get stream arn due to empty client")
194196
}
195197

196-
func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode string) *config.KinesisClientLibConfiguration {
198+
func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration {
197199
const sharedMode = "shared"
198200
if c.Kinesis != nil {
199201
if mode == sharedMode {
200-
if c.Credentials != nil {
201-
kclConfig := config.NewKinesisClientLibConfigWithCredential(consumer,
202-
stream, c.Region, consumer,
203-
c.Credentials)
204-
return kclConfig
202+
v1Creds, err := c.Credentials.Get()
203+
if err != nil {
204+
return nil
205205
}
206+
v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken)
207+
kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds)
208+
return kclConfig
206209
}
207210
}
208-
209211
return nil
210212
}
211213

common/authentication/aws/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func TestKinesisClients_WorkerCfg(t *testing.T) {
251251

252252
for _, tt := range tests {
253253
t.Run(tt.name, func(t *testing.T) {
254-
cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.consumer, tt.mode)
254+
cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.kinesisClient.Region, tt.mode, tt.consumer)
255255
if tt.expectedConfig == nil {
256256
assert.Equal(t, tt.expectedConfig, cfg)
257257
return

go.mod

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@ require (
150150
sigs.k8s.io/yaml v1.4.0
151151
)
152152

153+
require (
154+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 // indirect
155+
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 // indirect
156+
)
157+
153158
require (
154159
cel.dev/expr v0.23.0 // indirect
155160
cloud.google.com/go v0.120.0 // indirect
@@ -203,7 +208,6 @@ require (
203208
github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 // indirect
204209
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect
205210
github.com/aws/smithy-go v1.22.5 // indirect
206-
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f // indirect
207211
github.com/benbjohnson/clock v1.3.5 // indirect
208212
github.com/beorn7/perks v1.0.1 // indirect
209213
github.com/bits-and-blooms/bitset v1.4.0 // indirect
@@ -392,6 +396,7 @@ require (
392396
github.com/tklauser/go-sysconf v0.3.12 // indirect
393397
github.com/tklauser/numcpus v0.6.1 // indirect
394398
github.com/valyala/bytebufferpool v1.0.0 // indirect
399+
github.com/vmware/vmware-go-kcl-v2 v1.0.0
395400
github.com/x448/float16 v0.8.4 // indirect
396401
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
397402
github.com/xdg-go/stringprep v1.0.4 // indirect

go.sum

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,12 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W
268268
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
269269
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a h1:QFemvMGPnajaeRBkFc1HoEA7qzVjUv+rkYb1/ps1/UE=
270270
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
271-
github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
272271
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
273272
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
274273
github.com/aws/aws-sdk-go v1.55.6 h1:cSg4pvZ3m8dgYcgqB97MrcdjUmZ1BeMYKUxMMB89IPk=
275274
github.com/aws/aws-sdk-go v1.55.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
276275
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
276+
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
277277
github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
278278
github.com/aws/aws-sdk-go-v2 v1.36.5 h1:0OF9RiEMEdDdZEMqF9MRjevyxAQcf6gY+E7vwBILFj0=
279279
github.com/aws/aws-sdk-go-v2 v1.36.5/go.mod h1:EYrzvCCN9CMUTa5+6lf6MM4tq3Zjp8UhSGR/cBsjai0=
@@ -313,6 +313,9 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.17/go.mod
313313
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72HRZDLMtmVQiLG2tLfQcaWLCssELvGl+Zf2WVxMmR8=
314314
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 h1:t0E6FzREdtCsiLIoLCWsYliNsRBgyGD/MCK571qk4MI=
315315
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17/go.mod h1:ygpklyoaypuyDvOM5ujWGrYWpAK3h7ugnmKCU/76Ys4=
316+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
317+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I=
318+
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY=
316319
github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4 h1:BN6+zko+qO9Tl9S0ywUPNvY0gvlFK4Zmj2Y0a8paFkk=
317320
github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4/go.mod h1:hbMVfSdZneCht4UmPOsejDt93QnetQPFuLOOqbuybqs=
318321
github.com/aws/aws-sdk-go-v2/service/sns v1.34.7 h1:OBuZE9Wt8h2imuRktu+WfjiTGrnYdCIJg8IX92aalHE=
@@ -332,8 +335,8 @@ github.com/aws/rolesanywhere-credential-helper v1.0.4/go.mod h1:QVGNxlDlYhjR0/ZU
332335
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
333336
github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw=
334337
github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
335-
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo=
336-
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4=
338+
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
339+
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
337340
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
338341
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
339342
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
@@ -1714,6 +1717,8 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+
17141717
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
17151718
github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk=
17161719
github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo=
1720+
github.com/vmware/vmware-go-kcl-v2 v1.0.0 h1:HPT5vu+khRmGspBSc/+AilEWbRGoTZhjlYqdrBbRMZs=
1721+
github.com/vmware/vmware-go-kcl-v2 v1.0.0/go.mod h1:GBDu+P4Neo0vwZAk0ZUCEC8GYsUOWvi3XhFwAZR3SjA=
17171722
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
17181723
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
17191724
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=

0 commit comments

Comments
 (0)