Skip to content

Commit 82ee6ee

Browse files
#214 - use bulk api for migrating db to es
1 parent c5a73db commit 82ee6ee

File tree

2 files changed

+22
-15
lines changed

2 files changed

+22
-15
lines changed

scripts/createIndex.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ co(function * createIndex () {
3333
aggregateScore: { type: 'float' },
3434
isPassing: { type: 'boolean' },
3535
legacySubmissionId: { type: 'keyword' },
36-
submissionPhaseId: { type: 'long' },
36+
submissionPhaseId: { type: 'keyword' },
3737
fileType: { type: 'keyword' },
3838
filename: { type: 'keyword' },
3939
review: { type: 'nested' },

scripts/migrateFromDBToES.js

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const esClient = helper.getEsClient()
1717
* @returns {Promise}
1818
*/
1919
function * migrateRecords (tableName) {
20-
let promises = []
20+
let body = []
2121
let batchCounter = 1
2222
const params = {
2323
TableName: tableName
@@ -28,21 +28,23 @@ function * migrateRecords (tableName) {
2828
logger.debug(`Number of ${tableName}s currently fetched from DB - ` + records.Items.length)
2929
let i = 0
3030
for (const item of records.Items) {
31-
const record = {
32-
index: config.get('esConfig.ES_INDEX'),
33-
type: config.get('esConfig.ES_TYPE'),
34-
id: item.id,
35-
body: {
36-
doc: _.extend({ resource: helper.camelize(tableName) }, item),
37-
doc_as_upsert: true
31+
// action
32+
body.push({
33+
index: {
34+
_id: item.id
3835
}
39-
}
40-
promises.push(esClient.update(record))
36+
})
37+
// data
38+
body.push(_.extend({ resource: helper.camelize(tableName) }, item))
4139

4240
if (i % config.ES_BATCH_SIZE === 0) {
4341
logger.debug(`${tableName} - Processing batch # ` + batchCounter)
44-
yield promises
45-
promises = []
42+
yield esClient.bulk({
43+
index: config.get('esConfig.ES_INDEX'),
44+
type: config.get('esConfig.ES_TYPE'),
45+
body
46+
})
47+
body = []
4648
batchCounter++
4749
}
4850
i++
@@ -52,8 +54,13 @@ function * migrateRecords (tableName) {
5254
if (typeof records.LastEvaluatedKey !== 'undefined') {
5355
params.ExclusiveStartKey = records.LastEvaluatedKey
5456
} else {
55-
if (promises.length > 0) {
56-
yield promises
57+
if (body.length > 0) {
58+
logger.debug(`${tableName} - Final batch processing...`)
59+
yield esClient.bulk({
60+
index: config.get('esConfig.ES_INDEX'),
61+
type: config.get('esConfig.ES_TYPE'),
62+
body
63+
})
5764
}
5865
break // If there are no more records to process, exit the loop
5966
}

0 commit comments

Comments
 (0)