Skip to content
This repository was archived by the owner on Aug 19, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions source-quickstart.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
name=dynamodb-source-test
connector.class=dynamok.source.DynamoDbSourceConnector
region=us-west-2
arn:aws:dynamodb:us-west-2:360292250396:table/ci_calltracking_campaign
access.key.id=somekey
secret.key=somesecret
tables.prefix=
tables.whitelist=helloworld
tables.blacklist=
topic.format=exampleupstream
log.record.stream=True
7 changes: 6 additions & 1 deletion src/main/java/dynamok/source/ConnectorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private enum Keys {
static final String TABLES_WHITELIST = "tables.whitelist";
static final String TABLES_BLACKLIST = "tables.blacklist";
static final String TOPIC_FORMAT = "topic.format";
static final String LOG_RECORD_STREAM= "log.record.stream";
}

static final ConfigDef CONFIG_DEF = new ConfigDef()
Expand All @@ -59,7 +60,9 @@ private enum Keys {
.define(Keys.TABLES_BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(),
ConfigDef.Importance.MEDIUM, "Blacklist for DynamoDB tables to source from.")
.define(Keys.TOPIC_FORMAT, ConfigDef.Type.STRING, "${table}",
ConfigDef.Importance.HIGH, "Format string for destination Kafka topic, use ``${table}`` as placeholder for source table name.");
ConfigDef.Importance.HIGH, "Format string for destination Kafka topic, use ``${table}`` as placeholder for source table name.")
.define(Keys.LOG_RECORD_STREAM,ConfigDef.Type.STRING, "false",
ConfigDef.Importance.LOW,"Enable record level logging of Dynamo stream.");

final Regions region;
final Password accessKeyId;
Expand All @@ -68,6 +71,7 @@ private enum Keys {
final String tablesPrefix;
final List<String> tablesWhitelist;
final List<String> tablesBlacklist;
final String isRecordStreamLogOn;

ConnectorConfig(Map<String, String> props) {
super(CONFIG_DEF, props);
Expand All @@ -78,6 +82,7 @@ private enum Keys {
tablesWhitelist = getList(Keys.TABLES_WHITELIST);
tablesBlacklist = getList(Keys.TABLES_BLACKLIST);
topicFormat = getString(Keys.TOPIC_FORMAT);
isRecordStreamLogOn = getString(Keys.LOG_RECORD_STREAM);
}

public static void main(String... args) {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/dynamok/source/DynamoDbSourceConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
taskConfig.put(shard.getShardId() + "." + TaskConfig.Keys.TABLE, tableDesc.getTableName());
taskConfig.put(shard.getShardId() + "." + TaskConfig.Keys.STREAM_ARN, tableDesc.getLatestStreamArn());
});
taskConfig.put(TaskConfig.Keys.LOG_RECORD_STREAM, config.isRecordStreamLogOn);
return taskConfig;
}).collect(Collectors.toList());
}
Expand Down
49 changes: 32 additions & 17 deletions src/main/java/dynamok/source/DynamoDbSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;
import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamRecord;
import com.amazonaws.services.dynamodbv2.model.*;
import dynamok.Version;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -46,6 +42,8 @@ private enum Keys {
static final String SHARD = "shard";
static final String SEQNUM = "seqNum";
}
private static final String REMOVE = "REMOVE";
private static final String OPERATIONTYPE = "OperationType";

private final Logger log = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -106,20 +104,37 @@ public List<SourceRecord> poll() throws InterruptedException {
final String topic = config.topicFormat.replace("${table}", tableName);
final Map<String, String> sourcePartition = sourcePartition(shardId);

return rsp.getRecords().stream()
.map(dynamoRecord -> toSourceRecord(sourcePartition, topic, dynamoRecord.getDynamodb()))
.collect(Collectors.toList());
try {
return rsp.getRecords().stream()
.map(dynamoRecord -> toSourceRecord(sourcePartition, topic, dynamoRecord))
.collect(Collectors.toList());
}

catch (Exception exception) {
log.error("Failed to stream data into Kafka {}", exception.toString());
return null;
}
}

private SourceRecord toSourceRecord(Map<String, String> sourcePartition, String topic, StreamRecord dynamoRecord) {
return new SourceRecord(
sourcePartition,
Collections.singletonMap(Keys.SEQNUM, dynamoRecord.getSequenceNumber()),
topic, null,
RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getKeys()),
RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getNewImage()),
dynamoRecord.getApproximateCreationDateTime().getTime()
);
private SourceRecord toSourceRecord(Map<String, String> sourcePartition, String topic, Record record) {
StreamRecord dynamoRecord = record.getDynamodb();
log.info(config.isRecordStreamLogOn.toString());
if(config.isRecordStreamLogOn) {
log.info(record.toString());
}

Map<String, AttributeValue> keyAttributeMap = dynamoRecord.getKeys();
keyAttributeMap.put(OPERATIONTYPE,new AttributeValue().withS(record.getEventName()) );
Map<String, AttributeValue> valueAttributeMap = record.getEventName().equals(REMOVE)? new HashMap<>() :dynamoRecord.getNewImage();
return new SourceRecord(
sourcePartition,
Collections.singletonMap(Keys.SEQNUM, dynamoRecord.getSequenceNumber()),
topic, null,
RecordMapper.attributesSchema(), RecordMapper.toConnect(keyAttributeMap),
RecordMapper.attributesSchema(), RecordMapper.toConnect(valueAttributeMap),
dynamoRecord.getApproximateCreationDateTime().getTime()
);

}

private String shardIterator(String shardId) {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/dynamok/source/TaskConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum Keys {
static String SHARDS = "shards";
static String TABLE = "table";
static String STREAM_ARN = "stream.arn";
static String LOG_RECORD_STREAM= "log.record.stream";
}

private final Map<String, String> props;
Expand All @@ -45,7 +46,7 @@ enum Keys {
final String secretKey;
final String topicFormat;
final List<String> shards;

final Boolean isRecordStreamLogOn;
TaskConfig(Map<String, String> props) {
this.props = props;

Expand All @@ -54,6 +55,7 @@ enum Keys {
secretKey = getValue(Keys.SECRET_KEY, "");
topicFormat = getValue(Keys.TOPIC_FORMAT);
shards = Arrays.stream(getValue(Keys.SHARDS).split(",")).filter(shardId -> !shardId.isEmpty()).collect(Collectors.toList());
isRecordStreamLogOn = Boolean.valueOf(getValue(Keys.LOG_RECORD_STREAM));
}

String tableForShard(String shardId) {
Expand Down