From e797ea000a6f1ddac2b8b75e4507bf854506f2a6 Mon Sep 17 00:00:00 2001 From: Or Sher Date: Tue, 7 Feb 2017 17:16:31 +0200 Subject: [PATCH 1/2] NET-0000 Added updateItem functionlity --- README.rst | 14 +++ .../dynamok/sink/AttributeValueConverter.java | 1 + .../java/dynamok/sink/ConnectorConfig.java | 16 ++- .../java/dynamok/sink/DynamoDbSinkTask.java | 106 ++++++++++++++---- 4 files changed, 115 insertions(+), 22 deletions(-) diff --git a/README.rst b/README.rst index 295627c..14bd436 100644 --- a/README.rst +++ b/README.rst @@ -173,6 +173,20 @@ Configuration options * Default: 3000 * Importance: medium +``use.updates`` + Whether to use DynamoDB update api instead of insert api. + + * Type: boolean + * Default: false + * Importance: medium + +``table.keys.names`` + List of ``[hash_key_name,[range_key_name]]``. Must be given when using the use.updates flag. + + * Type: list + * Default: [] + * Importance: medium + Source Connector ================ diff --git a/src/main/java/dynamok/sink/AttributeValueConverter.java b/src/main/java/dynamok/sink/AttributeValueConverter.java index 1365c19..aa416ba 100644 --- a/src/main/java/dynamok/sink/AttributeValueConverter.java +++ b/src/main/java/dynamok/sink/AttributeValueConverter.java @@ -17,6 +17,7 @@ package dynamok.sink; import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; diff --git a/src/main/java/dynamok/sink/ConnectorConfig.java b/src/main/java/dynamok/sink/ConnectorConfig.java index 68307c1..5751c3a 100644 --- a/src/main/java/dynamok/sink/ConnectorConfig.java +++ b/src/main/java/dynamok/sink/ConnectorConfig.java @@ -43,6 +43,8 @@ private enum Keys { static final String TOP_VALUE_ATTRIBUTE = "top.value.attribute"; static final String MAX_RETRIES = "max.retries"; static final String RETRY_BACKOFF_MS = "retry.backoff.ms"; + static final String USE_UPDATES = "use.updates"; + static final String TABLE_KEYS_NAMES= "table.keys.names"; } static final ConfigDef CONFIG_DEF = new ConfigDef() @@ -80,7 +82,15 @@ private enum Keys { .define(Keys.MAX_RETRIES, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, "The maximum number of times to retry on errors before failing the task.") .define(Keys.RETRY_BACKOFF_MS, ConfigDef.Type.INT, 3000, - ConfigDef.Importance.MEDIUM, "The time in milliseconds to wait following an error before a retry attempt is made."); + ConfigDef.Importance.MEDIUM, "The time in milliseconds to wait following an error before a retry attempt is made.") + .define(Keys.USE_UPDATES, ConfigDef.Type.BOOLEAN, false, + ConfigDef.Importance.MEDIUM, "Whether to use DynamoDB update api instead of insert api.") + .define(Keys.TABLE_KEYS_NAMES, ConfigDef.Type.LIST, "", (key, names) -> { + final List namesList = (List) names; + if (!namesList.isEmpty() && namesList.size() > 2) + throw new ConfigException(Keys.TABLE_KEYS_NAMES, + "Must be empty or contain up to 2 attribute names mapping to hash key and (optionally) range key, but was: " + namesList); + }, ConfigDef.Importance.MEDIUM, "List of ``[hash_key_name,[range_key_name]]``. Must be given when using the "+Keys.USE_UPDATES+" flag."); final Regions region; final Password accessKeyId; @@ -94,6 +104,8 @@ private enum Keys { final String topValueAttribute; final int maxRetries; final int retryBackoffMs; + final boolean useUpdates; + final List tableKeysNames; ConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); @@ -109,6 +121,8 @@ private enum Keys { topValueAttribute = getString(Keys.TOP_VALUE_ATTRIBUTE); maxRetries = getInt(Keys.MAX_RETRIES); retryBackoffMs = getInt(Keys.RETRY_BACKOFF_MS); + useUpdates = getBoolean(Keys.USE_UPDATES); + tableKeysNames = getList(Keys.TABLE_KEYS_NAMES); } ConnectorConfig(Map props) { diff --git a/src/main/java/dynamok/sink/DynamoDbSinkTask.java b/src/main/java/dynamok/sink/DynamoDbSinkTask.java index 529c3e1..d8bbd75 100644 --- a/src/main/java/dynamok/sink/DynamoDbSinkTask.java +++ b/src/main/java/dynamok/sink/DynamoDbSinkTask.java @@ -19,14 +19,7 @@ import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; -import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult; -import com.amazonaws.services.dynamodbv2.model.LimitExceededException; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.dynamodbv2.model.PutRequest; -import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import com.amazonaws.services.dynamodbv2.model.*; import dynamok.Version; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -94,19 +87,11 @@ public void put(Collection records) { if (records.isEmpty()) return; try { - if (records.size() == 1 || config.batchSize == 1) { - for (final SinkRecord record : records) { - client.putItem(tableName(record), toPutRequest(record).getItem()); - } - } else { - final Iterator recordIterator = records.iterator(); - while (recordIterator.hasNext()) { - final Map> writesByTable = toWritesByTable(recordIterator); - final BatchWriteItemResult batchWriteResponse = client.batchWriteItem(new BatchWriteItemRequest(writesByTable)); - if (!batchWriteResponse.getUnprocessedItems().isEmpty()) { - throw new UnprocessedItemsException(batchWriteResponse.getUnprocessedItems()); - } - } + if (config.useUpdates){ + updateItems(records); + } + else { + putItems(records); } } catch (LimitExceededException | ProvisionedThroughputExceededException e) { log.debug("Write failed with Limit/Throughput Exceeded exception; backing off"); @@ -173,6 +158,85 @@ private void insert(ValueSource valueSource, Schema schema, Object value, PutReq } } + private void insertUpdate(ValueSource valueSource, Schema schema, Object value, UpdateItemRequest updateItem) { + final AttributeValue attributeValue; + try { + attributeValue = schema == null + ? AttributeValueConverter.toAttributeValueSchemaless(value) + : AttributeValueConverter.toAttributeValue(schema, value); + } catch (DataException e) { + log.error("Failed to convert record with schema={} value={}", schema, value, e); + throw e; + } + + final String topAttributeName = valueSource.topAttributeName(config); + if (!topAttributeName.isEmpty()) { + AttributeValueUpdate attributeValueUpdate = new AttributeValueUpdate().withValue(attributeValue); + if (config.tableKeysNames.contains(topAttributeName)){ + updateItem.addKeyEntry(topAttributeName,attributeValueUpdate.getValue()); + } + else { + log.info("tableKeysNames {} does not contain topAttributeName {}",config.tableKeysNames,topAttributeName); + updateItem.addAttributeUpdatesEntry(topAttributeName, attributeValueUpdate); + } + } else if (attributeValue.getM() != null) { + Map attributeValueMap = attributeValue.getM(); + config.tableKeysNames.forEach((k) -> { + if (attributeValueMap.containsKey(k)){ + //Remove key value from attribute value map to be updated and add it as a key + updateItem.addKeyEntry(k,attributeValueMap.get(k)); + attributeValueMap.remove(k); + } + }); + //Set the rest of the attributes as attributes update + Map attributeValueUpdateMap = new HashMap<>(); + attributeValueMap.forEach((k,v) -> attributeValueUpdateMap.put(k,new AttributeValueUpdate().withValue(v))); + updateItem.setAttributeUpdates(attributeValueUpdateMap); + } else { + throw new ConnectException("No top attribute name configured for " + valueSource + ", and it could not be converted to Map: " + attributeValue); + } + } + + private void putItems(Collection records) throws UnprocessedItemsException { + if (records.size() == 1 || config.batchSize == 1) { + for (final SinkRecord record : records) { + client.putItem(tableName(record), toPutRequest(record).getItem()); + } + } else { + final Iterator recordIterator = records.iterator(); + while (recordIterator.hasNext()) { + final Map> writesByTable = toWritesByTable(recordIterator); + final BatchWriteItemResult batchWriteResponse = client.batchWriteItem(new BatchWriteItemRequest(writesByTable)); + if (!batchWriteResponse.getUnprocessedItems().isEmpty()) { + throw new UnprocessedItemsException(batchWriteResponse.getUnprocessedItems()); + } + } + } + } + + private void updateItems(Collection records){ + for (final SinkRecord record : records) { + client.updateItem(toUpdateItemRequest(record).withTableName(tableName(record))); + } + } + + + private UpdateItemRequest toUpdateItemRequest(SinkRecord record) { + final UpdateItemRequest updateItem = new UpdateItemRequest(); + if (!config.ignoreRecordValue) { + insertUpdate(ValueSource.RECORD_VALUE, record.valueSchema(), record.value(), updateItem); + } + if (!config.ignoreRecordKey) { + insertUpdate(ValueSource.RECORD_KEY, record.keySchema(), record.key(), updateItem); + } + if (config.kafkaCoordinateNames != null) { + updateItem.addAttributeUpdatesEntry(config.kafkaCoordinateNames.topic, new AttributeValueUpdate().withValue(new AttributeValue().withS(record.topic()))); + updateItem.addAttributeUpdatesEntry(config.kafkaCoordinateNames.partition, new AttributeValueUpdate().withValue(new AttributeValue().withN(String.valueOf(record.kafkaPartition())))); + updateItem.addAttributeUpdatesEntry(config.kafkaCoordinateNames.offset, new AttributeValueUpdate().withValue(new AttributeValue().withN(String.valueOf(record.kafkaOffset())))); + } + return updateItem; + } + private String tableName(SinkRecord record) { return config.tableFormat.replace("${topic}", record.topic()); } From 9a2ce017b3b9daefd1593f09f6f514e4d90206b6 Mon Sep 17 00:00:00 2001 From: Or Sher Date: Tue, 7 Feb 2017 17:31:04 +0200 Subject: [PATCH 2/2] NET-0000 Removed info logging for debugging --- src/main/java/dynamok/sink/DynamoDbSinkTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/dynamok/sink/DynamoDbSinkTask.java b/src/main/java/dynamok/sink/DynamoDbSinkTask.java index d8bbd75..2929e51 100644 --- a/src/main/java/dynamok/sink/DynamoDbSinkTask.java +++ b/src/main/java/dynamok/sink/DynamoDbSinkTask.java @@ -176,7 +176,6 @@ private void insertUpdate(ValueSource valueSource, Schema schema, Object value, updateItem.addKeyEntry(topAttributeName,attributeValueUpdate.getValue()); } else { - log.info("tableKeysNames {} does not contain topAttributeName {}",config.tableKeysNames,topAttributeName); updateItem.addAttributeUpdatesEntry(topAttributeName, attributeValueUpdate); } } else if (attributeValue.getM() != null) {