|
| 1 | +//Lambda Runtime delivers a batch of messages to the lambda function |
| 2 | +//Each batch of messages has two fields EventSource and EventSourceARN |
| 3 | +//Each batch of messages also has a field called Records |
| 4 | +//The Records is a map with multiple keys and values |
| 5 | +//Each key is a combination of the Topic Name and the Partition Number |
| 6 | +//One batch of messages can contain messages from multiple partitions |
| 7 | + |
| 8 | +/* |
| 9 | +To simplify representing a batch of Kafka messages as a list of messages |
| 10 | +We have created a Java class called KafkaMessage under the models package |
| 11 | +Here we are mapping the structure of an incoming Kafka event to a list of |
| 12 | +objects of the KafkaMessage class |
| 13 | +*/ |
| 14 | + |
| 15 | +package com.amazonaws.services.lambda.samples.events.msk; |
| 16 | + |
| 17 | +import com.amazonaws.services.lambda.runtime.Context; |
| 18 | +import com.amazonaws.services.lambda.runtime.LambdaLogger; |
| 19 | +import com.amazonaws.services.lambda.runtime.RequestHandler; |
| 20 | +import com.amazonaws.services.lambda.runtime.events.KafkaEvent; |
| 21 | +import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; |
| 22 | + |
| 23 | +import java.util.ArrayList; |
| 24 | +import java.util.Base64; |
| 25 | +import java.util.Iterator; |
| 26 | +import java.util.List; |
| 27 | +import java.util.Map; |
| 28 | +import java.util.Set; |
| 29 | +import com.google.gson.Gson; |
| 30 | +import com.google.gson.GsonBuilder; |
| 31 | + |
| 32 | +public class HandlerMSK implements RequestHandler<KafkaEvent, String>{ |
| 33 | + //We initialize an empty list of the KafkaMessage class |
| 34 | + List<KafkaMessage> listOfMessages = new ArrayList<KafkaMessage>(); |
| 35 | + Gson gson = new GsonBuilder().setPrettyPrinting().create(); |
| 36 | + @Override |
| 37 | + public String handleRequest(KafkaEvent event, Context context) { |
| 38 | + LambdaLogger logger = context.getLogger(); |
| 39 | + logger.log("========== LAMBDA FUNCTION STARTED =========="); |
| 40 | + logger.log("Event received: " + gson.toJson(event)); |
| 41 | + |
| 42 | + String response = new String("200 OK"); |
| 43 | + this.listOfMessages = new ArrayList<KafkaMessage>(); |
| 44 | + |
| 45 | + // Counters for zip code patterns |
| 46 | + int zip1000Count = 0; |
| 47 | + int zip2000Count = 0; |
| 48 | + //Incoming KafkaEvent object has a property called records that is a map |
| 49 | + //Each key in the map is a combination of a topic and a partition |
| 50 | + Map<String, List<KafkaEventRecord>> record=event.getRecords(); |
| 51 | + |
| 52 | + if (record == null) { |
| 53 | + logger.log("WARNING: Event records map is null"); |
| 54 | + return response; |
| 55 | + } |
| 56 | + |
| 57 | + logger.log("Records map size: " + record.size()); |
| 58 | + |
| 59 | + Set<String> keySet = record.keySet(); |
| 60 | + logger.log("Key set size: " + keySet.size()); |
| 61 | + logger.log("Keys: " + keySet); |
| 62 | + |
| 63 | + Iterator<String> iterator = keySet.iterator(); |
| 64 | + //We iterate through each of the keys in the map |
| 65 | + while (iterator.hasNext()) { |
| 66 | + String thisKey=(String)iterator.next(); |
| 67 | + logger.log("Processing key: " + thisKey); |
| 68 | + |
| 69 | + //Using the key we retrieve the value of the map which is a list of KafkaEventRecord |
| 70 | + //One object of KafkaEventRecord represents an individual Kafka message |
| 71 | + List<KafkaEventRecord> thisListOfRecords = record.get(thisKey); |
| 72 | + |
| 73 | + if (thisListOfRecords == null) { |
| 74 | + logger.log("WARNING: Record list for key " + thisKey + " is null"); |
| 75 | + continue; |
| 76 | + } |
| 77 | + |
| 78 | + logger.log("Record list size for key " + thisKey + ": " + thisListOfRecords.size()); |
| 79 | + |
| 80 | + //We now iterate through the list of KafkaEventRecords |
| 81 | + for(KafkaEventRecord thisRecord : thisListOfRecords) { |
| 82 | + logger.log("Processing record..."); |
| 83 | + |
| 84 | + /* |
| 85 | + We initialize a new object of the KafkaMessage class which is a simplified representation in our models package |
| 86 | + We then get the fields from each kafka message in the object of KafkaEventRecord class and set them to the fields |
| 87 | + of the KafkaRecord class |
| 88 | + */ |
| 89 | + KafkaMessage thisMessage = new KafkaMessage(); |
| 90 | + thisMessage.setTopic(thisRecord.getTopic()); |
| 91 | + thisMessage.setPartition(thisRecord.getPartition()); |
| 92 | + thisMessage.setOffset(thisRecord.getOffset()); |
| 93 | + thisMessage.setTimestamp(thisRecord.getTimestamp()); |
| 94 | + thisMessage.setTimestampType(thisRecord.getTimestampType()); |
| 95 | + |
| 96 | + logger.log("Record metadata - Topic: " + thisRecord.getTopic() + |
| 97 | + ", Partition: " + thisRecord.getPartition() + |
| 98 | + ", Offset: " + thisRecord.getOffset()); |
| 99 | + |
| 100 | + String key = thisRecord.getKey(); |
| 101 | + String value = thisRecord.getValue(); |
| 102 | + |
| 103 | + logger.log("Key (base64): " + key); |
| 104 | + logger.log("Value (base64): " + value); |
| 105 | + |
| 106 | + String decodedKey = "null"; |
| 107 | + String decodedValue = "null"; |
| 108 | + //the key and value inside a kafka message are base64 encrypted and will need to be decrypted |
| 109 | + if (null != key) { |
| 110 | + logger.log("Decoding key..."); |
| 111 | + try { |
| 112 | + byte[] decodedKeyBytes = Base64.getDecoder().decode(key); |
| 113 | + decodedKey = new String(decodedKeyBytes); |
| 114 | + logger.log("Decoded key: " + decodedKey); |
| 115 | + } catch (Exception e) { |
| 116 | + logger.log("ERROR decoding key: " + e.getMessage()); |
| 117 | + } |
| 118 | + } else { |
| 119 | + logger.log("Key is null"); |
| 120 | + } |
| 121 | + |
| 122 | + if (null != value) { |
| 123 | + logger.log("Decoding value..."); |
| 124 | + try { |
| 125 | + byte[] decodedValueBytes = Base64.getDecoder().decode(value); |
| 126 | + logger.log("Value decoded, length: " + decodedValueBytes.length + " bytes"); |
| 127 | + |
| 128 | + // Print the complete message in hex format |
| 129 | + logger.log("Complete message in hex format:"); |
| 130 | + logger.log(bytesToHexString(decodedValueBytes, 0)); |
| 131 | + |
| 132 | + try { |
| 133 | + decodedValue = new String(decodedValueBytes); |
| 134 | + logger.log("Decoded value as string: " + (decodedValue.length() > 100 ? decodedValue.substring(0, 100) + "..." : decodedValue)); |
| 135 | + |
| 136 | + // Add more detailed logging for AVRO messages |
| 137 | + logger.log("=== AVRO MESSAGE DETAILS ==="); |
| 138 | + logger.log("Message appears to be AVRO-formatted. Attempting to extract fields:"); |
| 139 | + |
| 140 | + // Try to extract some common fields from the AVRO binary data |
| 141 | + // This is a simple approach to show some readable content |
| 142 | + StringBuilder readableContent = new StringBuilder(); |
| 143 | + for (int i = 0; i < decodedValueBytes.length; i++) { |
| 144 | + // Skip non-printable characters |
| 145 | + if (decodedValueBytes[i] >= 32 && decodedValueBytes[i] < 127) { |
| 146 | + readableContent.append((char)decodedValueBytes[i]); |
| 147 | + } |
| 148 | + } |
| 149 | + |
| 150 | + String readableString = readableContent.toString(); |
| 151 | + logger.log("Readable content extracted from AVRO: " + readableString); |
| 152 | + |
| 153 | + // Check for zip code patterns |
| 154 | + if (readableString.contains("1000")) { |
| 155 | + logger.log("FOUND ZIP CODE STARTING WITH 1000"); |
| 156 | + } |
| 157 | + if (readableString.contains("2000")) { |
| 158 | + logger.log("FOUND ZIP CODE STARTING WITH 2000"); |
| 159 | + } |
| 160 | + |
| 161 | + logger.log("=== END AVRO MESSAGE DETAILS ==="); |
| 162 | + } catch (Exception e) { |
| 163 | + logger.log("ERROR converting bytes to string: " + e.getMessage()); |
| 164 | + decodedValue = "Error decoding: " + e.getMessage(); |
| 165 | + } |
| 166 | + } catch (Exception e) { |
| 167 | + logger.log("ERROR decoding value: " + e.getMessage()); |
| 168 | + e.printStackTrace(); |
| 169 | + } |
| 170 | + } else { |
| 171 | + logger.log("Value is null"); |
| 172 | + } |
| 173 | + |
| 174 | + thisMessage.setKey(key); |
| 175 | + thisMessage.setValue(value); |
| 176 | + thisMessage.setDecodedKey(decodedKey); |
| 177 | + thisMessage.setDecodedValue(decodedValue); |
| 178 | + |
| 179 | + //A kafka message can optionally have a list of headers |
| 180 | + //the below code is to get the headers, iterate through each header and get its key and value |
| 181 | + List<KafkaHeader> headersInThisMessage = new ArrayList<KafkaHeader>(); |
| 182 | + List<Map<String, byte[]>> headers = thisRecord.getHeaders(); |
| 183 | + |
| 184 | + if (headers != null) { |
| 185 | + logger.log("Headers count: " + headers.size()); |
| 186 | + |
| 187 | + for (Map<String, byte[]> thisHeader : headers) { |
| 188 | + Set<String> thisHeaderKeys = thisHeader.keySet(); |
| 189 | + Iterator<String> thisHeaderKeysIterator = thisHeaderKeys.iterator(); |
| 190 | + while (thisHeaderKeysIterator.hasNext()) { |
| 191 | + String thisHeaderKey = thisHeaderKeysIterator.next(); |
| 192 | + byte[] thisHeaderValue = (byte[])thisHeader.get(thisHeaderKey); |
| 193 | + String thisHeaderValueString = new String(thisHeaderValue); |
| 194 | + KafkaHeader thisMessageHeader = new KafkaHeader(); |
| 195 | + thisMessageHeader.setKey(thisHeaderKey); |
| 196 | + thisMessageHeader.setValue(thisHeaderValueString); |
| 197 | + headersInThisMessage.add(thisMessageHeader); |
| 198 | + logger.log("Header - Key: " + thisHeaderKey + ", Value: " + thisHeaderValueString); |
| 199 | + } |
| 200 | + } |
| 201 | + } else { |
| 202 | + logger.log("No headers in message"); |
| 203 | + } |
| 204 | + |
| 205 | + thisMessage.setHeaders(headersInThisMessage); |
| 206 | + listOfMessages.add(thisMessage); |
| 207 | + |
| 208 | + // Below we are logging the particular kafka message in string format using the toString method |
| 209 | + // as well as in Json format using gson.toJson function |
| 210 | + logger.log("Received this message from Kafka - " + thisMessage.toString()); |
| 211 | + logger.log("Message in JSON format : " + gson.toJson(thisMessage)); |
| 212 | + |
| 213 | + // Add a more readable summary of the message |
| 214 | + logger.log("=== MESSAGE SUMMARY ==="); |
| 215 | + logger.log("Topic: " + thisMessage.getTopic()); |
| 216 | + logger.log("Partition: " + thisMessage.getPartition()); |
| 217 | + logger.log("Offset: " + thisMessage.getOffset()); |
| 218 | + logger.log("Key: " + thisMessage.getDecodedKey()); |
| 219 | + |
| 220 | + // Check for zip code patterns in the decoded value |
| 221 | + String decodedValueStr = thisMessage.getDecodedValue(); |
| 222 | + if (decodedValueStr != null) { |
| 223 | + if (decodedValueStr.contains("1000")) { |
| 224 | + logger.log("ZIP CODE: Found 1000 pattern in message"); |
| 225 | + zip1000Count++; |
| 226 | + } |
| 227 | + if (decodedValueStr.contains("2000")) { |
| 228 | + logger.log("ZIP CODE: Found 2000 pattern in message"); |
| 229 | + zip2000Count++; |
| 230 | + } |
| 231 | + } |
| 232 | + |
| 233 | + logger.log("=== END MESSAGE SUMMARY ==="); |
| 234 | + } |
| 235 | + } |
| 236 | + logger.log("All Messages in this batch = " + gson.toJson(listOfMessages)); |
| 237 | + |
| 238 | + // Log summary of zip code distribution |
| 239 | + logger.log("========== ZIP CODE DISTRIBUTION SUMMARY =========="); |
| 240 | + logger.log("Messages with zip code containing 1000: " + zip1000Count); |
| 241 | + logger.log("Messages with zip code containing 2000: " + zip2000Count); |
| 242 | + logger.log("Other messages: " + (listOfMessages.size() - zip1000Count - zip2000Count)); |
| 243 | + logger.log("===================================================="); |
| 244 | + |
| 245 | + logger.log("========== LAMBDA FUNCTION COMPLETED =========="); |
| 246 | + return response; |
| 247 | + } |
| 248 | + |
| 249 | + /** |
| 250 | + * Convert byte array to hexadecimal string representation |
| 251 | + * |
| 252 | + * @param bytes Byte array to convert |
| 253 | + * @param maxLength Maximum number of bytes to convert (0 for all) |
| 254 | + * @return Hexadecimal string representation |
| 255 | + */ |
| 256 | + private String bytesToHexString(byte[] bytes, int maxLength) { |
| 257 | + StringBuilder sb = new StringBuilder(); |
| 258 | + int length = maxLength > 0 && maxLength < bytes.length ? maxLength : bytes.length; |
| 259 | + |
| 260 | + for (int i = 0; i < length; i++) { |
| 261 | + sb.append(String.format("%02X", bytes[i])); |
| 262 | + if (i % 16 == 15) { |
| 263 | + sb.append("\n"); |
| 264 | + } else if (i % 4 == 3) { |
| 265 | + sb.append(" "); |
| 266 | + } |
| 267 | + } |
| 268 | + |
| 269 | + if (maxLength > 0 && length < bytes.length) { |
| 270 | + sb.append("... (").append(bytes.length - length).append(" more bytes)"); |
| 271 | + } |
| 272 | + |
| 273 | + return sb.toString(); |
| 274 | + } |
| 275 | +} |
0 commit comments