Skip to content

Commit 4414756

Browse files
authored
DGS-22077 Handle evolution during field transformation (#2541)
* DGS-22077 Handle evolution during field transformation * Minor rename * Fix test * Add new header
1 parent c56e1ae commit 4414756

File tree

3 files changed

+68
-7
lines changed

3 files changed

+68
-7
lines changed

src/Confluent.SchemaRegistry.Serdes.Avro/AvroUtils.cs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,23 +114,40 @@ public static async Task<object> Transform(RuleContext ctx, Avro.Schema schema,
114114
return await Utils.TransformDictionaryAsync(message, mapTransformer).ConfigureAwait(false);
115115
case Avro.Schema.Type.Record:
116116
RecordSchema rs = (RecordSchema)schema;
117+
if (message is ISpecificRecord)
118+
{
119+
ISpecificRecord specificRecord = (ISpecificRecord)message;
120+
rs = (RecordSchema)specificRecord.Schema;
121+
}
122+
else if (message is GenericRecord)
123+
{
124+
GenericRecord genericRecord = (GenericRecord)message;
125+
rs = (RecordSchema)genericRecord.Schema;
126+
}
117127
foreach (Field f in rs.Fields)
118128
{
129+
Field originalField = null;
130+
if (!((RecordSchema)schema).TryGetField(f.Name, out originalField))
131+
{
132+
originalField = f;
133+
}
134+
119135
string fullName = rs.Fullname + "." + f.Name;
120-
using (ctx.EnterField(message, fullName, f.Name, GetType(f.Schema), GetInlineTags(f)))
136+
using (ctx.EnterField(message, fullName, f.Name, GetType(originalField.Schema), GetInlineTags(originalField)))
121137
{
122138
if (message is ISpecificRecord)
123139
{
124140
ISpecificRecord specificRecord = (ISpecificRecord)message;
125141
object value = specificRecord.Get(f.Pos);
126-
object newValue = await Transform(ctx, f.Schema, value, fieldTransform).ConfigureAwait(false);
142+
object newValue = await Transform(ctx, originalField.Schema, value, fieldTransform).ConfigureAwait(false);
127143
if (ctx.Rule.Kind == RuleKind.Condition)
128144
{
129145
if (newValue is bool b && !b)
130146
{
131147
throw new RuleConditionException(ctx.Rule);
132148
}
133-
} else
149+
}
150+
else
134151
{
135152
specificRecord.Put(f.Pos, newValue);
136153
}
@@ -139,7 +156,7 @@ public static async Task<object> Transform(RuleContext ctx, Avro.Schema schema,
139156
{
140157
GenericRecord genericRecord = (GenericRecord)message;
141158
object value = genericRecord.GetValue(f.Pos);
142-
object newValue = await Transform(ctx, f.Schema, value, fieldTransform).ConfigureAwait(false);
159+
object newValue = await Transform(ctx, originalField.Schema, value, fieldTransform).ConfigureAwait(false);
143160
if (ctx.Rule.Kind == RuleKind.Condition)
144161
{
145162
if (newValue is bool b && !b)

src/Confluent.SchemaRegistry/Rest/RestService.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ public class RestService : IRestService
3636

3737
private static readonly string acceptHeader = string.Join(", ", Versions.PreferredResponseTypes);
3838

39-
private static readonly string acceptVersionHeader = "8.0";
40-
4139
public const int DefaultMaxRetries = 3;
4240

4341
public const int DefaultRetriesWaitMs = 1000;
@@ -447,7 +445,7 @@ private async Task<HttpRequestMessage> CreateRequest(string endPoint, HttpMethod
447445
{
448446
HttpRequestMessage request = new HttpRequestMessage(method, endPoint);
449447
request.Headers.Add("Accept", acceptHeader);
450-
request.Headers.Add("Accept-Version", acceptVersionHeader);
448+
request.Headers.Add("Confluent-Accept-Unknown-Properties", "true");
451449
if (jsonBody.Length != 0)
452450
{
453451
string stringContent = string.Join("\n", jsonBody.Select(x => JsonConvert.SerializeObject(x)));

test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,52 @@ public void ISpecificRecordCELFieldTransform()
399399
Assert.Equal(user.favorite_number, result.favorite_number);
400400
}
401401

402+
[Fact]
403+
public void ISpecificRecordCELFieldTransformMissingProp()
404+
{
405+
var schemaStr =
406+
"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"Confluent.Kafka.Examples.AvroSpecific\",\"fields\":[" +
407+
"{\"name\":\"name\",\"type\":\"string\"}," +
408+
"{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}," +
409+
"{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}," +
410+
"{\"name\":\"missing\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
411+
412+
var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Avro, null);
413+
schema.RuleSet = new RuleSet(new List<Rule>(),
414+
new List<Rule>
415+
{
416+
new Rule("testCEL", RuleKind.Transform, RuleMode.WriteRead, "CEL_FIELD", null, null,
417+
"typeName == 'STRING' ; value + '-suffix'", null, null, false)
418+
}
419+
);
420+
store[schemaStr] = 1;
421+
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
422+
var config = new AvroSerializerConfig
423+
{
424+
AutoRegisterSchemas = false,
425+
UseLatestVersion = true
426+
};
427+
var serializer = new AvroSerializer<UserWithPic>(schemaRegistryClient, config);
428+
var deserializer = new AvroDeserializer<User>(schemaRegistryClient, null);
429+
430+
var pic = new byte[] { 1, 2, 3 };
431+
var user = new UserWithPic()
432+
{
433+
favorite_color = "blue",
434+
favorite_number = 100,
435+
name = "awesome",
436+
picture = pic
437+
};
438+
439+
Headers headers = new Headers();
440+
var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
441+
var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
442+
443+
Assert.Equal("awesome-suffix-suffix", result.name);
444+
Assert.Equal("blue-suffix-suffix", result.favorite_color);
445+
Assert.Equal(user.favorite_number, result.favorite_number);
446+
}
447+
402448
[Fact]
403449
public void ISpecificRecordCELFieldTransformDisable()
404450
{

0 commit comments

Comments
 (0)