Skip to content

Commit f73aef6

Browse files
authored
Merge branch 'main' into main
2 parents e20aaab + de92aa7 commit f73aef6

File tree

15 files changed

+572
-101
lines changed

15 files changed

+572
-101
lines changed

bindings/kafka/metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,13 @@ metadata:
352352
It allows sending headers with special characters that are usually not allowed in HTTP headers.
353353
example: "true"
354354
default: "false"
355+
- name: useAvroJSON
356+
type: bool
357+
required: false
358+
description: |
359+
Enables Avro JSON schema for serialization. Only applicable when the subscription uses valueSchemaType=Avro
360+
example: "true"
361+
default: "false"
355362
- name: compression
356363
type: string
357364
required: false

common/authentication/aws/client_fake.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,18 @@ func (m *MockParameterStore) DescribeParametersWithContext(ctx context.Context,
4343
type MockSecretManager struct {
4444
GetSecretValueFn func(context.Context, *secretsmanager.GetSecretValueInput, ...request.Option) (*secretsmanager.GetSecretValueOutput, error)
4545
secretsmanageriface.SecretsManagerAPI
46+
47+
ListSecretsFn func(context.Context, *secretsmanager.ListSecretsInput, ...request.Option) (*secretsmanager.ListSecretsOutput, error)
4648
}
4749

4850
func (m *MockSecretManager) GetSecretValueWithContext(ctx context.Context, input *secretsmanager.GetSecretValueInput, option ...request.Option) (*secretsmanager.GetSecretValueOutput, error) {
4951
return m.GetSecretValueFn(ctx, input, option...)
5052
}
5153

54+
func (m *MockSecretManager) ListSecretsWithContext(ctx context.Context, input *secretsmanager.ListSecretsInput, option ...request.Option) (*secretsmanager.ListSecretsOutput, error) {
55+
return m.ListSecretsFn(ctx, input, option...)
56+
}
57+
5258
type MockDynamoDB struct {
5359
GetItemWithContextFn func(ctx context.Context, input *dynamodb.GetItemInput, op ...request.Option) (*dynamodb.GetItemOutput, error)
5460
PutItemWithContextFn func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (*dynamodb.PutItemOutput, error)

common/component/kafka/kafka.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,10 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
223223
}
224224
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
225225
k.consumeRetryInterval = meta.ConsumeRetryInterval
226-
227226
if meta.SchemaRegistryURL != "" {
228227
k.logger.Infof("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL)
229228
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
229+
k.srClient.CodecJsonEnabled(!meta.UseAvroJSON)
230230
// Empty password is a possibility
231231
if meta.SchemaRegistryAPIKey != "" {
232232
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
@@ -364,12 +364,7 @@ func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config Subscri
364364
if err != nil {
365365
return nil, err
366366
}
367-
// The data coming through is standard JSON. The version currently supported by srclient doesn't support this yet
368-
// Use this specific codec instead.
369-
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
370-
if err != nil {
371-
return nil, err
372-
}
367+
codec := schema.Codec() // The value returned in Avro JSON format
373368
native, _, err := codec.NativeFromBinary(message.Value[5:])
374369
if err != nil {
375370
return nil, err
@@ -405,12 +400,8 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec,
405400
if errSchema != nil {
406401
return nil, nil, errSchema
407402
}
408-
// New JSON standard serialization/Deserialization is not integrated in srclient yet.
409-
// Since standard json is passed from dapr, it is needed.
410-
codec, errCodec := goavro.NewCodecForStandardJSONFull(schema.Schema())
411-
if errCodec != nil {
412-
return nil, nil, errCodec
413-
}
403+
codec := schema.Codec()
404+
414405
k.latestSchemaCacheWriteLock.Lock()
415406
k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, codec: codec, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)}
416407
k.latestSchemaCacheWriteLock.Unlock()
@@ -420,12 +411,7 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec,
420411
if err != nil {
421412
return nil, nil, err
422413
}
423-
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
424-
if err != nil {
425-
return nil, nil, err
426-
}
427-
428-
return schema, codec, nil
414+
return schema, schema.Codec(), nil
429415
}
430416

431417
func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) {

0 commit comments

Comments
 (0)