diff --git a/source-quickstart.properties b/source-quickstart.properties index cc31c4a..7247d56 100644 --- a/source-quickstart.properties +++ b/source-quickstart.properties @@ -1,3 +1,11 @@ name=dynamodb-source-test connector.class=dynamok.source.DynamoDbSourceConnector region=us-west-2 +arn:aws:dynamodb:us-west-2:360292250396:table/ci_calltracking_campaign +access.key.id=somekey +secret.key=somesecret +tables.prefix= +tables.whitelist=helloworld +tables.blacklist= +topic.format=exampleupstream +log.record.stream=True \ No newline at end of file diff --git a/src/main/java/dynamok/source/ConnectorConfig.java b/src/main/java/dynamok/source/ConnectorConfig.java index beb3e86..5321cad 100644 --- a/src/main/java/dynamok/source/ConnectorConfig.java +++ b/src/main/java/dynamok/source/ConnectorConfig.java @@ -38,6 +38,7 @@ private enum Keys { static final String TABLES_WHITELIST = "tables.whitelist"; static final String TABLES_BLACKLIST = "tables.blacklist"; static final String TOPIC_FORMAT = "topic.format"; + static final String LOG_RECORD_STREAM= "log.record.stream"; } static final ConfigDef CONFIG_DEF = new ConfigDef() @@ -59,7 +60,9 @@ private enum Keys { .define(Keys.TABLES_BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "Blacklist for DynamoDB tables to source from.") .define(Keys.TOPIC_FORMAT, ConfigDef.Type.STRING, "${table}", - ConfigDef.Importance.HIGH, "Format string for destination Kafka topic, use ``${table}`` as placeholder for source table name."); + ConfigDef.Importance.HIGH, "Format string for destination Kafka topic, use ``${table}`` as placeholder for source table name.") + .define(Keys.LOG_RECORD_STREAM,ConfigDef.Type.STRING, "false", + ConfigDef.Importance.LOW,"Enable record level logging of Dynamo stream."); final Regions region; final Password accessKeyId; @@ -68,6 +71,7 @@ private enum Keys { final String tablesPrefix; final List tablesWhitelist; final List tablesBlacklist; + final String isRecordStreamLogOn; ConnectorConfig(Map props) { super(CONFIG_DEF, props); @@ -78,6 +82,7 @@ private enum Keys { tablesWhitelist = getList(Keys.TABLES_WHITELIST); tablesBlacklist = getList(Keys.TABLES_BLACKLIST); topicFormat = getString(Keys.TOPIC_FORMAT); + isRecordStreamLogOn = getString(Keys.LOG_RECORD_STREAM); } public static void main(String... args) { diff --git a/src/main/java/dynamok/source/DynamoDbSourceConnector.java b/src/main/java/dynamok/source/DynamoDbSourceConnector.java index 19849cf..1a45cd5 100644 --- a/src/main/java/dynamok/source/DynamoDbSourceConnector.java +++ b/src/main/java/dynamok/source/DynamoDbSourceConnector.java @@ -68,6 +68,7 @@ public List> taskConfigs(int maxTasks) { taskConfig.put(shard.getShardId() + "." + TaskConfig.Keys.TABLE, tableDesc.getTableName()); taskConfig.put(shard.getShardId() + "." + TaskConfig.Keys.STREAM_ARN, tableDesc.getLatestStreamArn()); }); + taskConfig.put(TaskConfig.Keys.LOG_RECORD_STREAM, config.isRecordStreamLogOn); return taskConfig; }).collect(Collectors.toList()); } diff --git a/src/main/java/dynamok/source/DynamoDbSourceTask.java b/src/main/java/dynamok/source/DynamoDbSourceTask.java index e88316f..00ac5c7 100644 --- a/src/main/java/dynamok/source/DynamoDbSourceTask.java +++ b/src/main/java/dynamok/source/DynamoDbSourceTask.java @@ -19,11 +19,7 @@ import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient; -import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; -import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; -import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; -import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; -import com.amazonaws.services.dynamodbv2.model.StreamRecord; +import com.amazonaws.services.dynamodbv2.model.*; import dynamok.Version; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; @@ -46,6 +42,8 @@ private enum Keys { static final String SHARD = "shard"; static final String SEQNUM = "seqNum"; } + private static final String REMOVE = "REMOVE"; + private static final String OPERATIONTYPE = "OperationType"; private final Logger log = LoggerFactory.getLogger(getClass()); @@ -106,20 +104,37 @@ public List poll() throws InterruptedException { final String topic = config.topicFormat.replace("${table}", tableName); final Map sourcePartition = sourcePartition(shardId); - return rsp.getRecords().stream() - .map(dynamoRecord -> toSourceRecord(sourcePartition, topic, dynamoRecord.getDynamodb())) - .collect(Collectors.toList()); + try { + return rsp.getRecords().stream() + .map(dynamoRecord -> toSourceRecord(sourcePartition, topic, dynamoRecord)) + .collect(Collectors.toList()); + } + + catch (Exception exception) { + log.error("Failed to stream data into Kafka {}", exception.toString()); + return null; + } } - private SourceRecord toSourceRecord(Map sourcePartition, String topic, StreamRecord dynamoRecord) { - return new SourceRecord( - sourcePartition, - Collections.singletonMap(Keys.SEQNUM, dynamoRecord.getSequenceNumber()), - topic, null, - RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getKeys()), - RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getNewImage()), - dynamoRecord.getApproximateCreationDateTime().getTime() - ); + private SourceRecord toSourceRecord(Map sourcePartition, String topic, Record record) { + StreamRecord dynamoRecord = record.getDynamodb(); + log.info(config.isRecordStreamLogOn.toString()); + if(config.isRecordStreamLogOn) { + log.info(record.toString()); + } + + Map keyAttributeMap = dynamoRecord.getKeys(); + keyAttributeMap.put(OPERATIONTYPE,new AttributeValue().withS(record.getEventName()) ); + Map valueAttributeMap = record.getEventName().equals(REMOVE)? new HashMap<>() :dynamoRecord.getNewImage(); + return new SourceRecord( + sourcePartition, + Collections.singletonMap(Keys.SEQNUM, dynamoRecord.getSequenceNumber()), + topic, null, + RecordMapper.attributesSchema(), RecordMapper.toConnect(keyAttributeMap), + RecordMapper.attributesSchema(), RecordMapper.toConnect(valueAttributeMap), + dynamoRecord.getApproximateCreationDateTime().getTime() + ); + } private String shardIterator(String shardId) { diff --git a/src/main/java/dynamok/source/TaskConfig.java b/src/main/java/dynamok/source/TaskConfig.java index 461a7fa..702434e 100644 --- a/src/main/java/dynamok/source/TaskConfig.java +++ b/src/main/java/dynamok/source/TaskConfig.java @@ -36,6 +36,7 @@ enum Keys { static String SHARDS = "shards"; static String TABLE = "table"; static String STREAM_ARN = "stream.arn"; + static String LOG_RECORD_STREAM= "log.record.stream"; } private final Map props; @@ -45,7 +46,7 @@ enum Keys { final String secretKey; final String topicFormat; final List shards; - + final Boolean isRecordStreamLogOn; TaskConfig(Map props) { this.props = props; @@ -54,6 +55,7 @@ enum Keys { secretKey = getValue(Keys.SECRET_KEY, ""); topicFormat = getValue(Keys.TOPIC_FORMAT); shards = Arrays.stream(getValue(Keys.SHARDS).split(",")).filter(shardId -> !shardId.isEmpty()).collect(Collectors.toList()); + isRecordStreamLogOn = Boolean.valueOf(getValue(Keys.LOG_RECORD_STREAM)); } String tableForShard(String shardId) {