99import org .slf4j .Logger ;
1010import org .slf4j .LoggerFactory ;
1111
12- import java .io .IOException ;
1312import java .util .AbstractMap ;
1413import java .util .ArrayList ;
1514import java .util .HashSet ;
5958 * stored as a single item, this mechanism will not work for extremely large flags or segments.
6059 * </ul>
6160 */
62- final class DynamoDbDataStoreImpl implements PersistentDataStore {
61+ final class DynamoDbDataStoreImpl extends DynamoDbStoreImplBase implements PersistentDataStore {
6362 private static final Logger logger = LoggerFactory .getLogger ("com.launchdarkly.sdk.server.LDClient.DataStore.DynamoDB" );
6463
65- static final String partitionKey = "namespace" ;
66- static final String sortKey = "key" ;
6764 private static final String versionAttribute = "version" ;
6865 private static final String itemJsonAttribute = "item" ;
6966 private static final String deletedItemPlaceholder = "null" ; // DynamoDB doesn't allow empty strings
70- private final DynamoDbClient client ;
71- private final String tableName ;
72- private final String prefix ;
73-
67+
7468 private Runnable updateHook ;
7569
76- DynamoDbDataStoreImpl (DynamoDbClient client , String tableName , String prefix ) {
77- this .client = client ;
78- this .tableName = tableName ;
79- this .prefix = "" .equals (prefix ) ? null : prefix ;
80- }
81-
82- @ Override
83- public void close () throws IOException {
84- client .close ();
70+ DynamoDbDataStoreImpl (DynamoDbClient client , boolean wasExistingClient , String tableName , String prefix ) {
71+ super (client , wasExistingClient , tableName , prefix );
8572 }
8673
8774 @ Override
@@ -95,10 +82,8 @@ public KeyedItems<SerializedItemDescriptor> getAll(DataKind kind) {
9582 List <Map .Entry <String , SerializedItemDescriptor >> itemsOut = new ArrayList <>();
9683 for (QueryResponse resp : client .queryPaginator (makeQueryForKind (kind ).build ())) {
9784 for (Map <String , AttributeValue > item : resp .items ()) {
98- AttributeValue keyAttr = item .get (sortKey );
99- if (keyAttr == null || keyAttr .s () == null ) {
100-
101- } else {
85+ AttributeValue keyAttr = item .get (SORT_KEY );
86+ if (keyAttr != null && keyAttr .s () != null ) {
10287 SerializedItemDescriptor itemOut = unmarshalItem (kind , item );
10388 if (itemOut != null ) {
10489 itemsOut .add (new AbstractMap .SimpleEntry <>(keyAttr .s (), itemOut ));
@@ -135,18 +120,17 @@ public void init(FullDataSet<SerializedItemDescriptor> allData) {
135120 // Now delete any previously existing items whose keys were not in the current data
136121 for (Map .Entry <String , String > combinedKey : unusedOldKeys ) {
137122 if (!combinedKey .getKey ().equals (initedKey ())) {
138- Map < String , AttributeValue > keys = mapOf (
139- partitionKey , AttributeValue . builder (). s ( combinedKey . getKey ()). build (),
140- sortKey , AttributeValue . builder (). s ( combinedKey .getValue ()). build ());
141- requests . add ( WriteRequest . builder (). deleteRequest ( builder -> builder . key ( keys )) .build ());
123+ requests . add ( WriteRequest . builder ()
124+ . deleteRequest ( builder ->
125+ builder . key ( makeKeysMap ( combinedKey . getKey (), combinedKey .getValue ())))
126+ .build ());
142127 }
143128 }
144129
145130 // Now set the special key that we check in initializedInternal()
146- Map <String , AttributeValue > initedItem = mapOf (
147- partitionKey , AttributeValue .builder ().s (initedKey ()).build (),
148- sortKey , AttributeValue .builder ().s (initedKey ()).build ());
149- requests .add (WriteRequest .builder ().putRequest (builder -> builder .item (initedItem )).build ());
131+ requests .add (WriteRequest .builder ()
132+ .putRequest (builder -> builder .item (makeKeysMap (initedKey (), initedKey ())))
133+ .build ());
150134
151135 batchWriteRequests (client , tableName , requests );
152136
@@ -166,8 +150,8 @@ public boolean upsert(DataKind kind, String key, SerializedItemDescriptor newIte
166150 .item (encodedItem )
167151 .conditionExpression ("attribute_not_exists(#namespace) or attribute_not_exists(#key) or :version > #version" )
168152 .expressionAttributeNames (mapOf (
169- "#namespace" , partitionKey ,
170- "#key" , sortKey ,
153+ "#namespace" , PARTITION_KEY ,
154+ "#key" , SORT_KEY ,
171155 "#version" , versionAttribute ))
172156 .expressionAttributeValues (mapOf (
173157 ":version" , AttributeValue .builder ().n (String .valueOf (newItem .getVersion ())).build ()))
@@ -199,11 +183,7 @@ public boolean isStoreAvailable() {
199183 public void setUpdateHook (Runnable updateHook ) {
200184 this .updateHook = updateHook ;
201185 }
202-
203- private String prefixedNamespace (String base ) {
204- return prefix == null ? base : (prefix + ":" + base );
205- }
206-
186+
207187 private String namespaceForKind (DataKind kind ) {
208188 return prefixedNamespace (kind .getName ());
209189 }
@@ -214,7 +194,7 @@ private String initedKey() {
214194
215195 private QueryRequest .Builder makeQueryForKind (DataKind kind ) {
216196 Map <String , Condition > keyConditions = mapOf (
217- partitionKey ,
197+ PARTITION_KEY ,
218198 Condition .builder ()
219199 .comparisonOperator (ComparisonOperator .EQ )
220200 .attributeValueList (AttributeValue .builder ().s (namespaceForKind (kind )).build ())
@@ -226,31 +206,20 @@ private QueryRequest.Builder makeQueryForKind(DataKind kind) {
226206 .keyConditions (keyConditions );
227207 }
228208
229- private GetItemResponse getItemByKeys (String namespace , String key ) {
230- Map <String , AttributeValue > keyMap = mapOf (
231- partitionKey , AttributeValue .builder ().s (namespace ).build (),
232- sortKey , AttributeValue .builder ().s (key ).build ()
233- );
234- return client .getItem (builder -> builder .tableName (tableName )
235- .consistentRead (true )
236- .key (keyMap )
237- );
238- }
239-
240209 private Set <Map .Entry <String , String >> readExistingKeys (FullDataSet <?> kindsFromThisDataSet ) {
241210 Set <Map .Entry <String , String >> keys = new HashSet <>();
242211 for (Map .Entry <DataKind , ?> e : kindsFromThisDataSet .getData ()) {
243212 DataKind kind = e .getKey ();
244213 QueryRequest req = makeQueryForKind (kind )
245214 .projectionExpression ("#namespace, #key" )
246215 .expressionAttributeNames (mapOf (
247- "#namespace" , partitionKey , "#key" , sortKey ))
216+ "#namespace" , PARTITION_KEY , "#key" , SORT_KEY ))
248217 .build ();
249218 QueryIterable queryResults = client .queryPaginator (req );
250219 for (QueryResponse resp : queryResults ) {
251220 for (Map <String , AttributeValue > item : resp .items ()) {
252- String namespace = item .get (partitionKey ).s ();
253- String key = item .get (sortKey ).s ();
221+ String namespace = item .get (PARTITION_KEY ).s ();
222+ String key = item .get (SORT_KEY ).s ();
254223 keys .add (new AbstractMap .SimpleEntry <>(namespace , key ));
255224 }
256225 }
@@ -261,8 +230,8 @@ private Set<Map.Entry<String, String>> readExistingKeys(FullDataSet<?> kindsFrom
261230 private Map <String , AttributeValue > marshalItem (DataKind kind , String key , SerializedItemDescriptor item ) {
262231 String json = item .isDeleted () ? deletedItemPlaceholder : item .getSerializedItem ();
263232 return mapOf (
264- partitionKey , AttributeValue .builder ().s (namespaceForKind (kind )).build (),
265- sortKey , AttributeValue .builder ().s (key ).build (),
233+ PARTITION_KEY , AttributeValue .builder ().s (namespaceForKind (kind )).build (),
234+ SORT_KEY , AttributeValue .builder ().s (key ).build (),
266235 versionAttribute , AttributeValue .builder ().n (String .valueOf (item .getVersion ())).build (),
267236 itemJsonAttribute , AttributeValue .builder ().s (json ).build ()
268237 );
0 commit comments