Skip to content

Commit bff26cd

Browse files
authored
Feature/avro code generation (#1)
* feat: Implement AVRO code generation and AvroKafkaHandler - Add Maven AVRO plugin for automatic code generation from schema - Create contact.avsc schema file with nullable fields and proper namespace - Generate Contact class from AVRO schema (implements SpecificRecord) - Add AvroKafkaHandler for proper AVRO deserialization with Lambda Powertools - Update SAM template to use AvroKafkaHandler instead of HandlerMSK - Remove manually created Contact POJO in favor of generated AVRO class This enables proper AVRO message deserialization in the MSK Lambda consumer using Lambda Powertools with auto-generated SpecificRecord classes. * feat: Update producer to use AVRO-generated Contact class - Add AVRO Maven plugin to producer pom.xml - Remove manual Contact POJO from producer - Generate Contact class from AVRO schema for producer - Ensure producer and consumer use identical AVRO-generated Contact class - Fix payload compatibility between producer and consumer This ensures both functions use the same SpecificRecord implementation for consistent AVRO serialization/deserialization. * updates to producer * Switched to simple handler * added power tools log * Logger updates
1 parent bfaf10e commit bff26cd

File tree

16 files changed

+3013
-212
lines changed

16 files changed

+3013
-212
lines changed

msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/.classpath

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,16 @@
3636
<attribute name="maven.pomderived" value="true"/>
3737
</attributes>
3838
</classpathentry>
39+
<classpathentry kind="src" path="target/generated-sources/annotations">
40+
<attributes>
41+
<attribute name="optional" value="true"/>
42+
</attributes>
43+
</classpathentry>
44+
<classpathentry kind="src" output="target/test-classes" path="target/generated-test-sources/test-annotations">
45+
<attributes>
46+
<attribute name="optional" value="true"/>
47+
<attribute name="test" value="true"/>
48+
</attributes>
49+
</classpathentry>
3950
<classpathentry kind="output" path="target/classes"/>
4051
</classpath>
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
eclipse.preferences.version=1
22
encoding//src/main/java=UTF-8
3+
encoding//src/main/resources=UTF-8
34
encoding//src/test/java=UTF-8
45
encoding/<project>=UTF-8
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
eclipse.preferences.version=1
2-
org.eclipse.jdt.apt.aptEnabled=false
2+
org.eclipse.jdt.apt.aptEnabled=true
3+
org.eclipse.jdt.apt.genSrcDir=target/generated-sources/annotations
4+
org.eclipse.jdt.apt.genTestSrcDir=target/generated-test-sources/test-annotations

msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/.settings/org.eclipse.jdt.core.prefs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ org.eclipse.jdt.core.compiler.compliance=11
44
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
55
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
66
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
7-
org.eclipse.jdt.core.compiler.processAnnotations=disabled
7+
org.eclipse.jdt.core.compiler.processAnnotations=enabled
88
org.eclipse.jdt.core.compiler.release=disabled
99
org.eclipse.jdt.core.compiler.source=11

msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/pom.xml

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,67 @@
7070
<artifactId>avro</artifactId>
7171
<version>1.11.1</version>
7272
</dependency>
73+
74+
<dependency>
75+
<groupId>software.amazon.lambda</groupId>
76+
<artifactId>powertools-kafka</artifactId>
77+
<version>2.1.0</version>
78+
</dependency>
79+
80+
<dependency>
81+
<groupId>software.amazon.lambda</groupId>
82+
<artifactId>powertools-logging-log4j</artifactId>
83+
<version>2.1.0</version>
84+
</dependency>
85+
86+
<!-- Kafka clients dependency - compatibility works for >= 3.0.0 -->
87+
<dependency>
88+
<groupId>org.apache.kafka</groupId>
89+
<artifactId>kafka-clients</artifactId>
90+
<version>4.0.0</version>
91+
</dependency>
92+
93+
<!-- AspectJ Runtime for Lambda PowerTools -->
94+
<dependency>
95+
<groupId>org.aspectj</groupId>
96+
<artifactId>aspectjrt</artifactId>
97+
<version>1.9.22</version>
98+
</dependency>
99+
73100
</dependencies>
74101

75102
<build>
103+
<pluginManagement>
104+
<plugins>
105+
<!-- Lifecycle mapping for AspectJ plugin -->
106+
<plugin>
107+
<groupId>org.eclipse.m2e</groupId>
108+
<artifactId>lifecycle-mapping</artifactId>
109+
<version>1.0.0</version>
110+
<configuration>
111+
<lifecycleMappingMetadata>
112+
<pluginExecutions>
113+
<pluginExecution>
114+
<pluginExecutionFilter>
115+
<groupId>dev.aspectj</groupId>
116+
<artifactId>aspectj-maven-plugin</artifactId>
117+
<versionRange>[1.14,)</versionRange>
118+
<goals>
119+
<goal>compile</goal>
120+
</goals>
121+
</pluginExecutionFilter>
122+
<action>
123+
<execute>
124+
<runOnIncremental>true</runOnIncremental>
125+
</execute>
126+
</action>
127+
</pluginExecution>
128+
</pluginExecutions>
129+
</lifecycleMappingMetadata>
130+
</configuration>
131+
</plugin>
132+
</plugins>
133+
</pluginManagement>
76134
<plugins>
77135
<plugin>
78136
<groupId>org.apache.maven.plugins</groupId>
@@ -100,6 +158,55 @@
100158
<artifactId>maven-surefire-plugin</artifactId>
101159
<version>3.1.2</version>
102160
</plugin>
161+
<plugin>
162+
<groupId>org.apache.avro</groupId>
163+
<artifactId>avro-maven-plugin</artifactId>
164+
<version>1.11.3</version>
165+
<executions>
166+
<execution>
167+
<phase>generate-sources</phase>
168+
<goals>
169+
<goal>schema</goal>
170+
</goals>
171+
<configuration>
172+
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
173+
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
174+
<stringType>String</stringType>
175+
</configuration>
176+
</execution>
177+
</executions>
178+
</plugin>
179+
<plugin>
180+
<groupId>dev.aspectj</groupId>
181+
<artifactId>aspectj-maven-plugin</artifactId>
182+
<version>1.14</version>
183+
<configuration>
184+
<source>11</source> <!-- or higher -->
185+
<target>11</target> <!-- or higher -->
186+
<complianceLevel>11</complianceLevel> <!-- or higher -->
187+
<aspectLibraries>
188+
<aspectLibrary>
189+
<groupId>software.amazon.lambda</groupId>
190+
<artifactId>powertools-logging</artifactId>
191+
</aspectLibrary>
192+
</aspectLibraries>
193+
</configuration>
194+
<dependencies>
195+
<dependency>
196+
<groupId>org.aspectj</groupId>
197+
<artifactId>aspectjtools</artifactId>
198+
<!-- AspectJ compiler version, in sync with runtime -->
199+
<version>1.9.22</version>
200+
</dependency>
201+
</dependencies>
202+
<executions>
203+
<execution>
204+
<goals>
205+
<goal>compile</goal>
206+
</goals>
207+
</execution>
208+
</executions>
209+
</plugin>
103210
</plugins>
104211
</build>
105212
</project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"type": "record",
3+
"name": "Contact",
4+
"namespace": "com.amazonaws.services.lambda.samples.events.msk",
5+
"fields": [
6+
{"name": "firstname", "type": ["null", "string"], "default": null},
7+
{"name": "lastname", "type": ["null", "string"], "default": null},
8+
{"name": "company", "type": ["null", "string"], "default": null},
9+
{"name": "street", "type": ["null", "string"], "default": null},
10+
{"name": "city", "type": ["null", "string"], "default": null},
11+
{"name": "county", "type": ["null", "string"], "default": null},
12+
{"name": "state", "type": ["null", "string"], "default": null},
13+
{"name": "zip", "type": ["null", "string"], "default": null},
14+
{"name": "homePhone", "type": ["null", "string"], "default": null},
15+
{"name": "cellPhone", "type": ["null", "string"], "default": null},
16+
{"name": "email", "type": ["null", "string"], "default": null},
17+
{"name": "website", "type": ["null", "string"], "default": null}
18+
]
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.amazonaws.services.lambda.samples.events.msk;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.apache.kafka.clients.consumer.ConsumerRecords;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import software.amazon.lambda.powertools.kafka.Deserialization;
10+
import software.amazon.lambda.powertools.kafka.DeserializationType;
11+
import software.amazon.lambda.powertools.logging.Logging;
12+
13+
public class AvroKafkaHandler implements RequestHandler<ConsumerRecords<String, Contact>, String> {
14+
private static final Logger LOGGER = LoggerFactory.getLogger(AvroKafkaHandler.class);
15+
16+
@Override
17+
@Logging(logEvent = true)
18+
@Deserialization(type = DeserializationType.KAFKA_AVRO)
19+
public String handleRequest(ConsumerRecords<String, Contact> records, Context context) {
20+
LOGGER.info("=== AvroKafkaHandler called ===");
21+
LOGGER.info("Event object: {}", records);
22+
LOGGER.info("Number of records: {}", records.count());
23+
24+
for (ConsumerRecord<String, Contact> record : records) {
25+
LOGGER.info("Processing record - Topic: {}, Partition: {}, Offset: {}",
26+
record.topic(), record.partition(), record.offset());
27+
LOGGER.info("Record key: {}", record.key());
28+
LOGGER.info("Record value: {}", record.value());
29+
30+
if (record.value() != null) {
31+
Contact contact = record.value();
32+
LOGGER.info("Contact details - firstName: {}, zip: {}",
33+
contact.getFirstname(), contact.getZip());
34+
}
35+
}
36+
37+
LOGGER.info("=== AvroKafkaHandler completed ===");
38+
return "OK";
39+
}
40+
}

0 commit comments

Comments
 (0)