Skip to content

Commit c56e1ae

Browse files
authored
Ensure schemaId initialization is thread-safe (#2540)
* Ensure schemaId initialization is thread-safe * Minor cleanup
1 parent 6c0863b commit c56e1ae

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
237237

238238
if (latestSchema != null)
239239
{
240-
schemaId = new SchemaId(SchemaType.Protobuf, latestSchema.Id, latestSchema.Guid);
240+
schemaId = new SchemaId(SchemaType.Protobuf, latestSchema.Id, latestSchema.Guid, indexArray);
241241
}
242242
else if (!subjectsRegistered.Contains(subject))
243243
{
@@ -259,7 +259,7 @@ await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema
259259
// note: different values for schemaId should never be seen here.
260260
// TODO: but fail fast may be better here.
261261

262-
schemaId = new SchemaId(SchemaType.Protobuf, outputSchema.Id, outputSchema.Guid);
262+
schemaId = new SchemaId(SchemaType.Protobuf, outputSchema.Id, outputSchema.Guid, indexArray);
263263
subjectsRegistered.Add(subject);
264264
}
265265
}
@@ -282,8 +282,6 @@ await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema
282282
.ConfigureAwait(continueOnCapturedContext: false);
283283
}
284284

285-
schemaId.MessageIndexes = indexArray;
286-
287285
var buffer = new byte[value.CalculateSize()];
288286
value.WriteTo(buffer);
289287
buffer = await ExecuteRules(context.Component == MessageComponentType.Key,

src/Confluent.SchemaRegistry/SchemaId.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ public SchemaId(SchemaType schemaType, int id, string guid)
8080
MessageIndexes = null;
8181
}
8282

83+
public SchemaId(SchemaType schemaType, int id, string guid, IReadOnlyList<int> messageIndexes)
84+
{
85+
SchemaType = schemaType;
86+
Id = id;
87+
Guid = guid != null ? System.Guid.Parse(guid) : null;
88+
MessageIndexes = messageIndexes;
89+
}
90+
8391
[MethodImpl(MethodImplOptions.AggressiveInlining)]
8492
public ReadOnlyMemory<byte> FromBytes(ReadOnlyMemory<byte> payload)
8593
{

0 commit comments

Comments
 (0)