Skip to content
This repository was archived by the owner on Aug 19, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,20 @@ Configuration options
* Default: 3000
* Importance: medium

``use.updates``
Whether to use DynamoDB update api instead of insert api.

* Type: boolean
* Default: false
* Importance: medium

``table.keys.names``
List of ``[hash_key_name,[range_key_name]]``. Must be given when using the use.updates flag.

* Type: list
* Default: []
* Importance: medium

Source Connector
================

Expand Down
1 change: 1 addition & 0 deletions src/main/java/dynamok/sink/AttributeValueConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package dynamok.sink;

import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/dynamok/sink/ConnectorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ private enum Keys {
static final String TOP_VALUE_ATTRIBUTE = "top.value.attribute";
static final String MAX_RETRIES = "max.retries";
static final String RETRY_BACKOFF_MS = "retry.backoff.ms";
static final String USE_UPDATES = "use.updates";
static final String TABLE_KEYS_NAMES= "table.keys.names";
}

static final ConfigDef CONFIG_DEF = new ConfigDef()
Expand Down Expand Up @@ -80,7 +82,15 @@ private enum Keys {
.define(Keys.MAX_RETRIES, ConfigDef.Type.INT, 10,
ConfigDef.Importance.MEDIUM, "The maximum number of times to retry on errors before failing the task.")
.define(Keys.RETRY_BACKOFF_MS, ConfigDef.Type.INT, 3000,
ConfigDef.Importance.MEDIUM, "The time in milliseconds to wait following an error before a retry attempt is made.");
ConfigDef.Importance.MEDIUM, "The time in milliseconds to wait following an error before a retry attempt is made.")
.define(Keys.USE_UPDATES, ConfigDef.Type.BOOLEAN, false,
ConfigDef.Importance.MEDIUM, "Whether to use DynamoDB update api instead of insert api.")
.define(Keys.TABLE_KEYS_NAMES, ConfigDef.Type.LIST, "", (key, names) -> {
final List namesList = (List) names;
if (!namesList.isEmpty() && namesList.size() > 2)
throw new ConfigException(Keys.TABLE_KEYS_NAMES,
"Must be empty or contain up to 2 attribute names mapping to hash key and (optionally) range key, but was: " + namesList);
}, ConfigDef.Importance.MEDIUM, "List of ``[hash_key_name,[range_key_name]]``. Must be given when using the "+Keys.USE_UPDATES+" flag.");

final Regions region;
final Password accessKeyId;
Expand All @@ -94,6 +104,8 @@ private enum Keys {
final String topValueAttribute;
final int maxRetries;
final int retryBackoffMs;
final boolean useUpdates;
final List<String> tableKeysNames;

ConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
Expand All @@ -109,6 +121,8 @@ private enum Keys {
topValueAttribute = getString(Keys.TOP_VALUE_ATTRIBUTE);
maxRetries = getInt(Keys.MAX_RETRIES);
retryBackoffMs = getInt(Keys.RETRY_BACKOFF_MS);
useUpdates = getBoolean(Keys.USE_UPDATES);
tableKeysNames = getList(Keys.TABLE_KEYS_NAMES);
}

ConnectorConfig(Map<String, String> props) {
Expand Down
105 changes: 84 additions & 21 deletions src/main/java/dynamok/sink/DynamoDbSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,7 @@
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
import com.amazonaws.services.dynamodbv2.model.LimitExceededException;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.dynamodbv2.model.PutRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import com.amazonaws.services.dynamodbv2.model.*;
import dynamok.Version;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -94,19 +87,11 @@ public void put(Collection<SinkRecord> records) {
if (records.isEmpty()) return;

try {
if (records.size() == 1 || config.batchSize == 1) {
for (final SinkRecord record : records) {
client.putItem(tableName(record), toPutRequest(record).getItem());
}
} else {
final Iterator<SinkRecord> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
final Map<String, List<WriteRequest>> writesByTable = toWritesByTable(recordIterator);
final BatchWriteItemResult batchWriteResponse = client.batchWriteItem(new BatchWriteItemRequest(writesByTable));
if (!batchWriteResponse.getUnprocessedItems().isEmpty()) {
throw new UnprocessedItemsException(batchWriteResponse.getUnprocessedItems());
}
}
if (config.useUpdates){
updateItems(records);
}
else {
putItems(records);
}
} catch (LimitExceededException | ProvisionedThroughputExceededException e) {
log.debug("Write failed with Limit/Throughput Exceeded exception; backing off");
Expand Down Expand Up @@ -173,6 +158,84 @@ private void insert(ValueSource valueSource, Schema schema, Object value, PutReq
}
}

private void insertUpdate(ValueSource valueSource, Schema schema, Object value, UpdateItemRequest updateItem) {
final AttributeValue attributeValue;
try {
attributeValue = schema == null
? AttributeValueConverter.toAttributeValueSchemaless(value)
: AttributeValueConverter.toAttributeValue(schema, value);
} catch (DataException e) {
log.error("Failed to convert record with schema={} value={}", schema, value, e);
throw e;
}

final String topAttributeName = valueSource.topAttributeName(config);
if (!topAttributeName.isEmpty()) {
AttributeValueUpdate attributeValueUpdate = new AttributeValueUpdate().withValue(attributeValue);
if (config.tableKeysNames.contains(topAttributeName)){
updateItem.addKeyEntry(topAttributeName,attributeValueUpdate.getValue());
}
else {
updateItem.addAttributeUpdatesEntry(topAttributeName, attributeValueUpdate);
}
} else if (attributeValue.getM() != null) {
Map<String,AttributeValue> attributeValueMap = attributeValue.getM();
config.tableKeysNames.forEach((k) -> {
if (attributeValueMap.containsKey(k)){
//Remove key value from attribute value map to be updated and add it as a key
updateItem.addKeyEntry(k,attributeValueMap.get(k));
attributeValueMap.remove(k);
}
});
//Set the rest of the attributes as attributes update
Map<String,AttributeValueUpdate> attributeValueUpdateMap = new HashMap<>();
attributeValueMap.forEach((k,v) -> attributeValueUpdateMap.put(k,new AttributeValueUpdate().withValue(v)));
updateItem.setAttributeUpdates(attributeValueUpdateMap);
} else {
throw new ConnectException("No top attribute name configured for " + valueSource + ", and it could not be converted to Map: " + attributeValue);
}
}

private void putItems(Collection<SinkRecord> records) throws UnprocessedItemsException {
if (records.size() == 1 || config.batchSize == 1) {
for (final SinkRecord record : records) {
client.putItem(tableName(record), toPutRequest(record).getItem());
}
} else {
final Iterator<SinkRecord> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
final Map<String, List<WriteRequest>> writesByTable = toWritesByTable(recordIterator);
final BatchWriteItemResult batchWriteResponse = client.batchWriteItem(new BatchWriteItemRequest(writesByTable));
if (!batchWriteResponse.getUnprocessedItems().isEmpty()) {
throw new UnprocessedItemsException(batchWriteResponse.getUnprocessedItems());
}
}
}
}

private void updateItems(Collection<SinkRecord> records){
for (final SinkRecord record : records) {
client.updateItem(toUpdateItemRequest(record).withTableName(tableName(record)));
}
}


private UpdateItemRequest toUpdateItemRequest(SinkRecord record) {
final UpdateItemRequest updateItem = new UpdateItemRequest();
if (!config.ignoreRecordValue) {
insertUpdate(ValueSource.RECORD_VALUE, record.valueSchema(), record.value(), updateItem);
}
if (!config.ignoreRecordKey) {
insertUpdate(ValueSource.RECORD_KEY, record.keySchema(), record.key(), updateItem);
}
if (config.kafkaCoordinateNames != null) {
updateItem.addAttributeUpdatesEntry(config.kafkaCoordinateNames.topic, new AttributeValueUpdate().withValue(new AttributeValue().withS(record.topic())));
updateItem.addAttributeUpdatesEntry(config.kafkaCoordinateNames.partition, new AttributeValueUpdate().withValue(new AttributeValue().withN(String.valueOf(record.kafkaPartition()))));
updateItem.addAttributeUpdatesEntry(config.kafkaCoordinateNames.offset, new AttributeValueUpdate().withValue(new AttributeValue().withN(String.valueOf(record.kafkaOffset()))));
}
return updateItem;
}

private String tableName(SinkRecord record) {
return config.tableFormat.replace("${topic}", record.topic());
}
Expand Down