@@ -2,17 +2,17 @@ var AWS = require('aws-sdk');
22var dataKind = require ( 'ldclient-node/versioned_data_kind' ) ;
33var winston = require ( 'winston' ) ;
44
5+ var helpers = require ( './dynamodb_helpers' ) ;
56var CachingStoreWrapper = require ( 'ldclient-node/caching_store_wrapper' ) ;
67
7- var initializedToken = { namespace : '$inited' , key : '$inited' } ;
88var defaultCacheTTLSeconds = 15 ;
99
1010function DynamoDBFeatureStore ( tableName , options ) {
1111 var ttl = options && options . cacheTTL ;
1212 if ( ttl === null || ttl === undefined ) {
1313 ttl = defaultCacheTTLSeconds ;
1414 }
15- return new CachingStoreWrapper ( new dynamoDBFeatureStoreInternal ( tableName , options ) , ttl ) ;
15+ return new CachingStoreWrapper ( dynamoDBFeatureStoreInternal ( tableName , options ) , ttl ) ;
1616}
1717
1818function dynamoDBFeatureStoreInternal ( tableName , options ) {
@@ -26,12 +26,15 @@ function dynamoDBFeatureStoreInternal(tableName, options) {
2626 } )
2727 ) ;
2828 var dynamoDBClient = options . dynamoDBClient || new AWS . DynamoDB . DocumentClient ( options . clientOptions ) ;
29+ var prefix = options . prefix || '' ;
2930
30- this . getInternal = function ( kind , key , cb ) {
31+ var store = { } ;
32+
33+ store . getInternal = function ( kind , key , cb ) {
3134 dynamoDBClient . get ( {
3235 TableName : tableName ,
3336 Key : {
34- namespace : kind . namespace ,
37+ namespace : namespaceForKind ( kind ) ,
3538 key : key ,
3639 }
3740 } , function ( err , data ) {
@@ -46,14 +49,9 @@ function dynamoDBFeatureStoreInternal(tableName, options) {
4649 } ) ;
4750 } ;
4851
49- this . getAllInternal = function ( kind , cb ) {
50- var params = {
51- TableName : tableName ,
52- KeyConditionExpression : 'namespace = :namespace' ,
53- FilterExpression : 'attribute_not_exists(deleted) OR deleted = :deleted' ,
54- ExpressionAttributeValues : { ':namespace' : kind . namespace , ':deleted' : false }
55- } ;
56- this . paginationHelper ( params , function ( params , cb ) { return dynamoDBClient . query ( params , cb ) ; } ) . then ( function ( items ) {
52+ store . getAllInternal = function ( kind , cb ) {
53+ var params = queryParamsForNamespace ( kind . namespace ) ;
54+ helpers . queryHelper ( dynamoDBClient , params ) . then ( function ( items ) {
5755 var results = { } ;
5856 for ( var i = 0 ; i < items . length ; i ++ ) {
5957 var item = unmarshalItem ( items [ i ] ) ;
@@ -68,18 +66,17 @@ function dynamoDBFeatureStoreInternal(tableName, options) {
6866 } ) ;
6967 } ;
7068
71- this . initInternal = function ( allData , cb ) {
72- var this_ = this ;
73- this . paginationHelper ( { TableName : tableName } , function ( params , cb ) { return dynamoDBClient . scan ( params , cb ) ; } )
69+ store . initInternal = function ( allData , cb ) {
70+ readExistingItems ( allData )
7471 . then ( function ( existingItems ) {
75- var existingNamespaceKeys = [ ] ;
72+ var existingNamespaceKeys = { } ;
7673 for ( var i = 0 ; i < existingItems . length ; i ++ ) {
7774 existingNamespaceKeys [ makeNamespaceKey ( existingItems [ i ] ) ] = existingItems [ i ] . version ;
7875 }
7976
8077 // Always write the initialized token when we initialize.
81- var ops = [ { PutRequest : { TableName : tableName , Item : initializedToken } } ] ;
82- delete existingNamespaceKeys [ makeNamespaceKey ( initializedToken ) ] ;
78+ var ops = [ { PutRequest : { TableName : tableName , Item : initializedToken ( ) } } ] ;
79+ delete existingNamespaceKeys [ makeNamespaceKey ( initializedToken ( ) ) ] ;
8380
8481 // Write all initial data (with version checks).
8582 for ( var kindNamespace in allData ) {
@@ -104,22 +101,21 @@ function dynamoDBFeatureStoreInternal(tableName, options) {
104101 } } ) ;
105102 }
106103
107- var writePromises = this_ . batchWrite ( ops ) ;
104+ var writePromises = helpers . batchWrite ( dynamoDBClient , tableName , ops ) ;
108105
109- Promise . all ( writePromises ) . then ( function ( ) { cb && cb ( ) ; } ) ;
106+ return Promise . all ( writePromises ) . then ( function ( ) { cb && cb ( ) ; } ) ;
110107 } ,
111108 function ( err ) {
112- logger . error ( 'failed to retrieve initial state : ' + err ) ;
109+ logger . error ( 'failed to initialize : ' + err ) ;
113110 } ) ;
114111 } ;
115112
116- this . upsertInternal = function ( kind , item , cb ) {
113+ store . upsertInternal = function ( kind , item , cb ) {
117114 var params = makePutRequest ( kind , item ) ;
118115
119116 // testUpdateHook is instrumentation, used only by the unit tests
120- var prepare = this . testUpdateHook || function ( prepareCb ) { prepareCb ( ) ; } ;
117+ var prepare = store . testUpdateHook || function ( prepareCb ) { prepareCb ( ) ; } ;
121118
122- var this_ = this ;
123119 prepare ( function ( ) {
124120 dynamoDBClient . put ( params , function ( err ) {
125121 if ( err ) {
@@ -128,7 +124,7 @@ function dynamoDBFeatureStoreInternal(tableName, options) {
128124 cb ( err , null ) ;
129125 return ;
130126 }
131- this_ . getInternal ( kind , item . key , function ( existingItem ) {
127+ store . getInternal ( kind , item . key , function ( existingItem ) {
132128 cb ( null , existingItem ) ;
133129 } ) ;
134130 return ;
@@ -138,68 +134,60 @@ function dynamoDBFeatureStoreInternal(tableName, options) {
138134 } ) ;
139135 } ;
140136
141- this . initializedInternal = function ( cb ) {
137+ store . initializedInternal = function ( cb ) {
138+ var token = initializedToken ( ) ;
142139 dynamoDBClient . get ( {
143140 TableName : tableName ,
144- Key : initializedToken ,
141+ Key : token ,
145142 } , function ( err , data ) {
146143 if ( err ) {
147144 logger . error ( err ) ;
148145 cb ( false ) ;
149146 return ;
150147 }
151- var inited = data . Item && data . Item . key === initializedToken . key ;
148+ var inited = data . Item && data . Item . key === token . key ;
152149 cb ( ! ! inited ) ;
153150 } ) ;
154151 } ;
155152
156- this . close = function ( ) {
153+ store . close = function ( ) {
157154 // The node DynamoDB client is stateless, so close isn't a meaningful operation.
158155 } ;
159156
160- this . batchWrite = function ( ops ) {
161- var writePromises = [ ] ;
162- // BatchWrite can only accept 25 items at a time, so split up the writes into batches of 25.
163- for ( var i = 0 ; i < ops . length ; i += 25 ) {
164- var requestItems = { } ;
165- requestItems [ tableName ] = ops . slice ( i , i + 25 ) ;
166- writePromises . push ( new Promise ( function ( resolve , reject ) {
167- dynamoDBClient . batchWrite ( {
168- RequestItems : requestItems
169- } , function ( err ) {
170- if ( err ) {
171- logger . error ( 'failed to init: ' + err ) ;
172- reject ( ) ;
173- }
174- resolve ( ) ;
175- } ) ;
176- } ) ) ;
177- }
178- return writePromises ;
179- } ;
180-
181- this . paginationHelper = function ( params , executeFn , startKey ) {
182- var this_ = this ;
183- return new Promise ( function ( resolve , reject ) {
184- if ( startKey ) {
185- params [ 'ExclusiveStartKey' ] = startKey ;
186- }
187- executeFn ( params , function ( err , data ) {
188- if ( err ) {
189- reject ( err ) ;
190- return ;
191- }
157+ function queryParamsForNamespace ( namespace ) {
158+ return {
159+ TableName : tableName ,
160+ KeyConditionExpression : 'namespace = :namespace' ,
161+ FilterExpression : 'attribute_not_exists(deleted) OR deleted = :deleted' ,
162+ ExpressionAttributeValues : { ':namespace' : namespace , ':deleted' : false }
163+ } ;
164+ }
192165
193- if ( 'LastEvaluatedKey' in data ) {
194- this_ . paginationHelper ( params , executeFn , data [ 'LastEvaluatedKey' ] ) . then ( function ( nextPageItems ) {
195- resolve ( data . Items . concat ( nextPageItems ) ) ;
196- } ) ;
197- } else {
198- resolve ( data . Items ) ;
199- }
166+ function readExistingItems ( newData ) {
167+ var p = Promise . resolve ( [ ] ) ;
168+ Object . keys ( newData ) . forEach ( function ( namespace ) {
169+ p = p . then ( function ( previousItems ) {
170+ var params = queryParamsForNamespace ( namespace ) ;
171+ return helpers . queryHelper ( dynamoDBClient , params ) . then ( function ( items ) {
172+ return previousItems . concat ( items ) ;
173+ } ) ;
200174 } ) ;
201175 } ) ;
202- } ;
176+ return p ;
177+ }
178+
179+ function prefixedNamespace ( baseNamespace ) {
180+ return prefix ? ( prefix + ':' + baseNamespace ) : baseNamespace ;
181+ }
182+
183+ function namespaceForKind ( kind ) {
184+ return prefixedNamespace ( kind . namespace ) ;
185+ }
186+
187+ function initializedToken ( ) {
188+ var value = prefixedNamespace ( '$inited' ) ;
189+ return { namespace : value , key : value } ;
190+ }
203191
204192 function marshalItem ( kind , item ) {
205193 return {
@@ -235,7 +223,7 @@ function dynamoDBFeatureStoreInternal(tableName, options) {
235223 return item . namespace + '$' + item . key ;
236224 }
237225
238- return this ;
226+ return store ;
239227}
240228
241229module . exports = DynamoDBFeatureStore ;
0 commit comments