|
1 | 1 | package com.devshawn.kafka.gitops.service; |
2 | 2 |
|
3 | | - |
| 3 | +import java.io.IOException; |
| 4 | +import java.nio.charset.StandardCharsets; |
| 5 | +import java.nio.file.Files; |
| 6 | +import java.nio.file.Paths; |
| 7 | +import java.util.ArrayList; |
| 8 | +import java.util.Collections; |
| 9 | +import java.util.HashMap; |
| 10 | +import java.util.List; |
| 11 | +import java.util.Map; |
| 12 | +import java.util.Optional; |
| 13 | +import org.apache.kafka.common.config.SaslConfigs; |
4 | 14 | import com.devshawn.kafka.gitops.config.SchemaRegistryConfig; |
5 | 15 | import com.devshawn.kafka.gitops.domain.plan.SchemaPlan; |
| 16 | +import com.devshawn.kafka.gitops.domain.state.SchemaDetails; |
| 17 | +import com.devshawn.kafka.gitops.enums.SchemaType; |
6 | 18 | import com.devshawn.kafka.gitops.exception.SchemaRegistryExecutionException; |
| 19 | +import com.devshawn.kafka.gitops.exception.ValidationException; |
7 | 20 | import com.fasterxml.jackson.core.JsonProcessingException; |
8 | 21 | import com.fasterxml.jackson.databind.JsonNode; |
9 | 22 | import com.fasterxml.jackson.databind.ObjectMapper; |
10 | 23 | import com.flipkart.zjsonpatch.JsonDiff; |
| 24 | +import io.confluent.kafka.schemaregistry.AbstractSchemaProvider; |
11 | 25 | import io.confluent.kafka.schemaregistry.ParsedSchema; |
| 26 | +import io.confluent.kafka.schemaregistry.SchemaProvider; |
12 | 27 | import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; |
13 | 28 | import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; |
14 | 29 | import io.confluent.kafka.schemaregistry.client.SchemaMetadata; |
15 | 30 | import io.confluent.kafka.schemaregistry.client.rest.RestService; |
| 31 | +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; |
16 | 32 | import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; |
17 | 33 | import io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider; |
18 | | -import org.apache.kafka.common.config.SaslConfigs; |
19 | | - |
20 | | -import java.io.IOException; |
21 | | -import java.nio.charset.StandardCharsets; |
22 | | -import java.nio.file.Files; |
23 | | -import java.nio.file.Paths; |
24 | | -import java.util.*; |
| 34 | +import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; |
| 35 | +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; |
25 | 36 |
|
26 | 37 | public class SchemaRegistryService { |
27 | 38 |
|
@@ -55,15 +66,71 @@ public void deleteSubject(String subject, boolean isPermanent) { |
55 | 66 |
|
56 | 67 | public int register(SchemaPlan schemaPlan) { |
57 | 68 | final CachedSchemaRegistryClient cachedSchemaRegistryClient = createSchemaRegistryClient(); |
58 | | - AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider(); |
59 | | - Optional<ParsedSchema> parsedSchema = avroSchemaProvider.parseSchema(loadSchemaFromDisk(schemaPlan.getSchemaDetails().get().getFile()), Collections.emptyList()); |
| 69 | + ParsedSchema parsedSchema; |
| 70 | + if(SchemaType.AVRO.toString().equalsIgnoreCase(schemaPlan.getSchemaDetails().get().getType())) { |
| 71 | + AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider(); |
| 72 | + parsedSchema = avroSchemaProvider.parseSchema( |
| 73 | + loadSchemaFromDisk(schemaPlan.getSchemaDetails().get().getFile()), Collections.emptyList()).get(); |
| 74 | + } else if (SchemaType.JSON.toString().equalsIgnoreCase(schemaPlan.getSchemaDetails().get().getType())) { |
| 75 | + JsonSchemaProvider jsonSchemaProvider = new JsonSchemaProvider(); |
| 76 | + parsedSchema = jsonSchemaProvider.parseSchema( |
| 77 | + loadSchemaFromDisk(schemaPlan.getSchemaDetails().get().getFile()), Collections.emptyList()).get(); |
| 78 | + } else if (SchemaType.PROTOBUF.toString().equalsIgnoreCase(schemaPlan.getSchemaDetails().get().getType())) { |
| 79 | + ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider(); |
| 80 | + parsedSchema = protobufSchemaProvider.parseSchema(loadSchemaFromDisk( |
| 81 | + schemaPlan.getSchemaDetails().get().getFile()), Collections.emptyList()).get(); |
| 82 | + } else { |
| 83 | + throw new ValidationException("Unknown schema type: " + schemaPlan.getSchemaDetails().get().getType()); |
| 84 | + } |
60 | 85 | try { |
61 | | - return cachedSchemaRegistryClient.register(schemaPlan.getName(), parsedSchema.get()); |
| 86 | + return cachedSchemaRegistryClient.register(schemaPlan.getName(), parsedSchema); |
62 | 87 | } catch (IOException | RestClientException ex) { |
63 | 88 | throw new SchemaRegistryExecutionException("Error thrown when attempting to register subject with schema registry", ex.getMessage()); |
64 | 89 | } |
65 | 90 | } |
66 | 91 |
|
| 92 | + public void validateSchema(SchemaDetails schemaDetails) { |
| 93 | + if (schemaDetails.getType().equalsIgnoreCase(SchemaType.AVRO.toString())) { |
| 94 | + AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider(); |
| 95 | + validateSchema(schemaDetails, avroSchemaProvider); |
| 96 | + } else if (schemaDetails.getType().equalsIgnoreCase(SchemaType.JSON.toString())) { |
| 97 | + JsonSchemaProvider jsonSchemaProvider = new JsonSchemaProvider(); |
| 98 | + validateSchema(schemaDetails, jsonSchemaProvider); |
| 99 | + } else if (schemaDetails.getType().equalsIgnoreCase(SchemaType.PROTOBUF.toString())) { |
| 100 | + ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider(); |
| 101 | + validateSchema(schemaDetails, protobufSchemaProvider); |
| 102 | + } else { |
| 103 | + throw new ValidationException("Unknown schema type: " + schemaDetails.getType()); |
| 104 | + } |
| 105 | + } |
| 106 | + |
| 107 | + public void validateSchema(SchemaDetails schemaDetails, AbstractSchemaProvider schemaProvider) { |
| 108 | + if (schemaDetails.getReferences().isEmpty()) { |
| 109 | + Optional<ParsedSchema> parsedSchema = schemaProvider.parseSchema(loadSchemaFromDisk(schemaDetails.getFile()), Collections.emptyList()); |
| 110 | + if (!parsedSchema.isPresent()) { |
| 111 | + throw new ValidationException(String.format("%s schema %s could not be parsed.", schemaProvider.schemaType(), schemaDetails.getFile())); |
| 112 | + } |
| 113 | + } else { |
| 114 | + List<SchemaReference> schemaReferences = new ArrayList<>(); |
| 115 | + schemaDetails.getReferences().forEach(referenceDetails -> { |
| 116 | + SchemaReference schemaReference = new SchemaReference(referenceDetails.getName(), referenceDetails.getSubject(), referenceDetails.getVersion()); |
| 117 | + schemaReferences.add(schemaReference); |
| 118 | + }); |
| 119 | + // we need to pass a schema registry client as a config because the underlying code validates against the current state |
| 120 | + schemaProvider.configure(Collections.singletonMap(SchemaProvider.SCHEMA_VERSION_FETCHER_CONFIG, createSchemaRegistryClient())); |
| 121 | + try { |
| 122 | + Optional<ParsedSchema> parsedSchema = schemaProvider.parseSchema(loadSchemaFromDisk(schemaDetails.getFile()), schemaReferences); |
| 123 | + if (!parsedSchema.isPresent()) { |
| 124 | + throw new ValidationException(String.format("%s schema %s could not be parsed.", schemaProvider.schemaType(), schemaDetails.getFile())); |
| 125 | + } |
| 126 | + } catch (IllegalStateException ex) { |
| 127 | + throw new ValidationException(String.format("Reference validation error: %s", ex.getMessage())); |
| 128 | + } catch (RuntimeException ex) { |
| 129 | + throw new ValidationException(String.format("Error thrown when attempting to validate %s schema with reference: %s", schemaProvider.schemaType(), ex.getMessage())); |
| 130 | + } |
| 131 | + } |
| 132 | + } |
| 133 | + |
67 | 134 | public SchemaMetadata getLatestSchemaMetadata(String subject) { |
68 | 135 | final CachedSchemaRegistryClient cachedSchemaRegistryClient = createSchemaRegistryClient(); |
69 | 136 | try { |
|
0 commit comments