2424import com .devshawn .kafka .gitops .service .KafkaService ;
2525import com .devshawn .kafka .gitops .service .ParserService ;
2626import com .devshawn .kafka .gitops .service .RoleService ;
27+ import com .devshawn .kafka .gitops .config .SchemaRegistryConfig ;
28+ import com .devshawn .kafka .gitops .config .SchemaRegistryConfigLoader ;
29+ import com .devshawn .kafka .gitops .service .SchemaRegistryService ;
2730import com .devshawn .kafka .gitops .util .LogUtil ;
2831import com .devshawn .kafka .gitops .util .StateUtil ;
2932import com .fasterxml .jackson .core .JsonParser ;
3033import com .fasterxml .jackson .databind .DeserializationFeature ;
3134import com .fasterxml .jackson .databind .ObjectMapper ;
3235import com .fasterxml .jackson .datatype .jdk8 .Jdk8Module ;
36+ import io .confluent .kafka .schemaregistry .ParsedSchema ;
37+ import io .confluent .kafka .schemaregistry .SchemaProvider ;
38+ import io .confluent .kafka .schemaregistry .avro .AvroSchemaProvider ;
39+ import io .confluent .kafka .schemaregistry .client .rest .entities .SchemaReference ;
3340import org .slf4j .LoggerFactory ;
3441
42+ import java .nio .file .Files ;
43+ import java .nio .file .Paths ;
3544import java .util .ArrayList ;
45+ import java .util .Collections ;
3646import java .util .List ;
3747import java .util .Map ;
3848import java .util .NoSuchElementException ;
@@ -48,6 +58,7 @@ public class StateManager {
4858 private final ObjectMapper objectMapper ;
4959 private final ParserService parserService ;
5060 private final KafkaService kafkaService ;
61+ private final SchemaRegistryService schemaRegistryService ;
5162 private final RoleService roleService ;
5263 private final ConfluentCloudService confluentCloudService ;
5364
@@ -61,17 +72,19 @@ public StateManager(ManagerConfig managerConfig, ParserService parserService) {
6172 this .managerConfig = managerConfig ;
6273 this .objectMapper = initializeObjectMapper ();
6374 this .kafkaService = new KafkaService (KafkaGitopsConfigLoader .load ());
75+ this .schemaRegistryService = new SchemaRegistryService (SchemaRegistryConfigLoader .load ());
6476 this .parserService = parserService ;
6577 this .roleService = new RoleService ();
6678 this .confluentCloudService = new ConfluentCloudService (objectMapper );
67- this .planManager = new PlanManager (managerConfig , kafkaService , objectMapper );
68- this .applyManager = new ApplyManager (managerConfig , kafkaService );
79+ this .planManager = new PlanManager (managerConfig , kafkaService , schemaRegistryService , objectMapper );
80+ this .applyManager = new ApplyManager (managerConfig , kafkaService , schemaRegistryService );
6981 }
7082
7183 public DesiredStateFile getAndValidateStateFile () {
7284 DesiredStateFile desiredStateFile = parserService .parseStateFile ();
7385 validateTopics (desiredStateFile );
7486 validateCustomAcls (desiredStateFile );
87+ validateSchemas (desiredStateFile );
7588 this .describeAclEnabled = StateUtil .isDescribeTopicAclEnabled (desiredStateFile );
7689 return desiredStateFile ;
7790 }
@@ -90,6 +103,7 @@ private DesiredPlan generatePlan() {
90103 planManager .planAcls (desiredState , desiredPlan );
91104 }
92105 planManager .planTopics (desiredState , desiredPlan );
106+ planManager .planSchemas (desiredState , desiredPlan );
93107 return desiredPlan .build ();
94108 }
95109
@@ -105,6 +119,7 @@ public DesiredPlan apply() {
105119 if (!managerConfig .isSkipAclsDisabled ()) {
106120 applyManager .applyAcls (desiredPlan );
107121 }
122+ applyManager .applySchemas (desiredPlan );
108123
109124 return desiredPlan ;
110125 }
@@ -145,6 +160,7 @@ private DesiredState getDesiredState() {
145160 .addAllPrefixedTopicsToIgnore (getPrefixedTopicsToIgnore (desiredStateFile ));
146161
147162 generateTopicsState (desiredState , desiredStateFile );
163+ generateSchemasState (desiredState , desiredStateFile );
148164
149165 if (isConfluentCloudEnabled (desiredStateFile )) {
150166 generateConfluentCloudServiceAcls (desiredState , desiredStateFile );
@@ -169,6 +185,10 @@ private void generateTopicsState(DesiredState.Builder desiredState, DesiredState
169185 }
170186 }
171187
188+ private void generateSchemasState (DesiredState .Builder desiredState , DesiredStateFile desiredStateFile ) {
189+ desiredState .putAllSchemas (desiredStateFile .getSchemas ());
190+ }
191+
172192 private void generateConfluentCloudServiceAcls (DesiredState .Builder desiredState , DesiredStateFile desiredStateFile ) {
173193 List <ServiceAccount > serviceAccounts = confluentCloudService .getServiceAccounts ();
174194 desiredStateFile .getServices ().forEach ((name , service ) -> {
@@ -321,6 +341,47 @@ private void validateTopics(DesiredStateFile desiredStateFile) {
321341 }
322342 }
323343
344+ private void validateSchemas (DesiredStateFile desiredStateFile ) {
345+ if (!desiredStateFile .getSchemas ().isEmpty ()) {
346+ SchemaRegistryConfig schemaRegistryConfig = SchemaRegistryConfigLoader .load ();
347+ desiredStateFile .getSchemas ().forEach ((s , schemaDetails ) -> {
348+ if (!schemaDetails .getType ().equalsIgnoreCase ("Avro" )) {
349+ throw new ValidationException (String .format ("Schema type %s is currently not supported." , schemaDetails .getType ()));
350+ }
351+ if (!Files .exists (Paths .get (schemaRegistryConfig .getConfig ().get ("SCHEMA_DIRECTORY" ) + "/" + schemaDetails .getFile ()))) {
352+ throw new ValidationException (String .format ("Schema file %s not found in schema directory at %s" , schemaDetails .getFile (), schemaRegistryConfig .getConfig ().get ("SCHEMA_DIRECTORY" )));
353+ }
354+ if (schemaDetails .getType ().equalsIgnoreCase ("Avro" )) {
355+ AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider ();
356+ if (schemaDetails .getReferences ().isEmpty () && schemaDetails .getType ().equalsIgnoreCase ("Avro" )) {
357+ Optional <ParsedSchema > parsedSchema = avroSchemaProvider .parseSchema (schemaRegistryService .loadSchemaFromDisk (schemaDetails .getFile ()), Collections .emptyList ());
358+ if (!parsedSchema .isPresent ()) {
359+ throw new ValidationException (String .format ("Avro schema %s could not be parsed." , schemaDetails .getFile ()));
360+ }
361+ } else {
362+ List <SchemaReference > schemaReferences = new ArrayList <>();
363+ schemaDetails .getReferences ().forEach (referenceDetails -> {
364+ SchemaReference schemaReference = new SchemaReference (referenceDetails .getName (), referenceDetails .getSubject (), referenceDetails .getVersion ());
365+ schemaReferences .add (schemaReference );
366+ });
367+ // we need to pass a schema registry client as a config because the underlying code validates against the current state
368+ avroSchemaProvider .configure (Collections .singletonMap (SchemaProvider .SCHEMA_VERSION_FETCHER_CONFIG , schemaRegistryService .createSchemaRegistryClient ()));
369+ try {
370+ Optional <ParsedSchema > parsedSchema = avroSchemaProvider .parseSchema (schemaRegistryService .loadSchemaFromDisk (schemaDetails .getFile ()), schemaReferences );
371+ if (!parsedSchema .isPresent ()) {
372+ throw new ValidationException (String .format ("Avro schema %s could not be parsed." , schemaDetails .getFile ()));
373+ }
374+ } catch (IllegalStateException ex ) {
375+ throw new ValidationException (String .format ("Reference validation error: %s" , ex .getMessage ()));
376+ } catch (RuntimeException ex ) {
377+ throw new ValidationException (String .format ("Error thrown when attempting to validate schema with reference" , ex .getMessage ()));
378+ }
379+ }
380+ }
381+ });
382+ }
383+ }
384+
324385 private boolean isConfluentCloudEnabled (DesiredStateFile desiredStateFile ) {
325386 if (desiredStateFile .getSettings ().isPresent () && desiredStateFile .getSettings ().get ().getCcloud ().isPresent ()) {
326387 return desiredStateFile .getSettings ().get ().getCcloud ().get ().isEnabled ();
0 commit comments