11const _ = require ( 'lodash' )
2+ const config = require ( 'config' )
23const sequelize = require ( '../../src/models/index' )
34const dbHelper = require ( '../../src/common/db-helper' )
45const logger = require ( '../../src/common/logger' )
@@ -130,7 +131,7 @@ async function cleanupES (keys) {
130131 console . log ( 'Existing data in elasticsearch has been deleted!' )
131132}
132133
133- async function insertIntoES ( modelName , body ) {
134+ async function insertIntoES ( modelName , dataset ) {
134135 const esResourceName = modelToESIndexMapping [ modelName ]
135136
136137 if ( ! esResourceName ) {
@@ -140,36 +141,46 @@ async function insertIntoES (modelName, body) {
140141 }
141142
142143 if ( _ . includes ( _ . keys ( topResources ) , esResourceName ) ) {
143- await client . index ( {
144- index : topResources [ esResourceName ] . index ,
145- type : topResources [ esResourceName ] . type ,
146- id : body . id ,
147- body,
148- pipeline : topResources [ esResourceName ] . ingest ? topResources [ esResourceName ] . ingest . pipeline . id : undefined ,
149- refresh : 'wait_for'
150- } )
144+ const chunked = _ . chunk ( dataset , config . get ( 'ES.MAX_BULK_SIZE' ) )
145+ for ( const ds of chunked ) {
146+ const body = _ . flatMap ( ds , doc => [ { index : { _id : doc . id } } , doc ] )
147+ await client . bulk ( {
148+ index : topResources [ esResourceName ] . index ,
149+ type : topResources [ esResourceName ] . type ,
150+ body,
151+ pipeline : topResources [ esResourceName ] . ingest ? topResources [ esResourceName ] . ingest . pipeline . id : undefined ,
152+ refresh : 'wait_for'
153+ } )
154+ }
151155 } else if ( _ . includes ( _ . keys ( userResources ) , esResourceName ) ) {
152156 const userResource = userResources [ esResourceName ]
153157
154- let user
155-
156- try {
157- const res = await client . getSource ( {
158+ let users = [ ]
159+ // query all users
160+ const idsArr = _ . chunk ( _ . uniq ( _ . map ( dataset , 'userId' ) ) , config . get ( 'ES.MAX_RESULT_SIZE' ) )
161+ for ( const ids of idsArr ) {
162+ const res = await client . search ( {
158163 index : topResources . user . index ,
159164 type : topResources . user . type ,
160- id : body . userId
165+ size : dataset . length ,
166+ body : {
167+ query : {
168+ ids : {
169+ values : ids
170+ }
171+ }
172+ }
161173 } )
174+ users . push ( ..._ . map ( res . body . hits . hits , '_source' ) )
175+ }
162176
163- user = res . body
164- } catch ( e ) {
165- if ( e . meta && e . meta . body . error . type === RESOURCE_NOT_FOUND ) {
166- logger . info ( `The ${ modelName } references user with id ${ body . userId } , which does not exist. Deleting the reference...` )
177+ // remove unreference resource
178+ for ( const data of dataset ) {
179+ if ( ! _ . some ( users , [ 'id' , data . userId ] ) ) {
180+ logger . info ( `The ${ modelName } references user with id ${ data . userId } , which does not exist. Deleting the reference...` )
167181 // The user does not exist. Delete the referece records
168- await dbHelper . remove ( models [ modelName ] , body . id )
182+ await dbHelper . remove ( models [ modelName ] , data . id )
169183 logger . info ( 'Reference deleted' )
170- return
171- } else {
172- throw e
173184 }
174185 }
175186
@@ -189,65 +200,89 @@ async function insertIntoES (modelName, body) {
189200 userResource . mappingCreated = true
190201 }
191202
192- const relateId = body [ userResource . relateKey ]
193-
194- if ( ! user [ userResource . propertyName ] ) {
195- user [ userResource . propertyName ] = [ ]
196- }
203+ users = _ . filter ( users , user => {
204+ if ( ! user [ userResource . propertyName ] ) {
205+ user [ userResource . propertyName ] = [ ]
206+ }
207+ let updated = false
208+ _ . forEach ( _ . filter ( dataset , [ 'userId' , user . id ] ) , body => {
209+ const relateId = body [ userResource . relateKey ]
210+ if ( _ . some ( user [ userResource . propertyName ] , [ userResource . relateKey , relateId ] ) ) {
211+ logger . error ( `Can't create existing ${ esResourceName } with the ${ userResource . relateKey } : ${ relateId } , userId: ${ body . userId } ` )
212+ } else {
213+ updated = true
214+ user [ userResource . propertyName ] . push ( body )
215+ }
216+ } )
217+ return updated
218+ } )
197219
198- if ( _ . some ( user [ userResource . propertyName ] , [ userResource . relateKey , relateId ] ) ) {
199- logger . error ( `Can't create existing ${ esResourceName } with the ${ userResource . relateKey } : ${ relateId } , userId: ${ body . userId } ` )
200- } else {
201- user [ userResource . propertyName ] . push ( body )
202- await client . index ( {
220+ const chunked = _ . chunk ( users , config . get ( 'ES.MAX_BULK_SIZE' ) )
221+ for ( const us of chunked ) {
222+ const body = _ . flatMap ( us , doc => [ { index : { _id : doc . id } } , doc ] )
223+ await client . bulk ( {
203224 index : topResources . user . index ,
204225 type : topResources . user . type ,
205- id : body . userId ,
206- body : user ,
226+ body,
207227 pipeline : topResources . user . pipeline . id ,
208228 refresh : 'wait_for'
209229 } )
210230 }
211231 } else if ( _ . includes ( _ . keys ( organizationResources ) , esResourceName ) ) {
212232 const orgResource = organizationResources [ esResourceName ]
213233
214- let organization
215-
216- try {
217- const res = await client . getSource ( {
234+ let organizations = [ ]
235+ // query all organizations
236+ const idsArr = _ . chunk ( _ . uniq ( _ . map ( dataset , 'organizationId' ) ) , config . get ( 'ES.MAX_RESULT_SIZE' ) )
237+ for ( const ids of idsArr ) {
238+ const res = await client . search ( {
218239 index : topResources . organization . index ,
219240 type : topResources . organization . type ,
220- id : body . organizationId
241+ size : dataset . length ,
242+ body : {
243+ query : {
244+ ids : {
245+ values : ids
246+ }
247+ }
248+ }
221249 } )
250+ organizations . push ( ..._ . map ( res . body . hits . hits , '_source' ) )
251+ }
222252
223- organization = res . body
224- } catch ( e ) {
225- if ( e . meta && e . meta . body . error . type === RESOURCE_NOT_FOUND ) {
226- logger . info ( `The ${ modelName } references org with id ${ body . organizationId } , which does not exist. Deleting the reference...` )
227- // The user does not exist. Delete the referece records
228- await dbHelper . remove ( models [ modelName ] , body . id )
253+ for ( const data of dataset ) {
254+ if ( ! _ . some ( organizations , [ 'id' , data . organizationId ] ) ) {
255+ logger . info ( `The ${ modelName } references org with id ${ data . organizationId } , which does not exist. Deleting the reference...` )
256+ // The org does not exist. Delete the referece records
257+ await dbHelper . remove ( models [ modelName ] , data . id )
229258 logger . info ( 'Reference deleted' )
230- return
231- } else {
232- throw e
233259 }
234260 }
235261
236- const relateId = body [ orgResource . relateKey ]
237-
238- if ( ! organization [ orgResource . propertyName ] ) {
239- organization [ orgResource . propertyName ] = [ ]
240- }
262+ organizations = _ . filter ( organizations , organization => {
263+ if ( ! organization [ orgResource . propertyName ] ) {
264+ organization [ orgResource . propertyName ] = [ ]
265+ }
266+ let updated = false
267+ _ . forEach ( _ . filter ( dataset , [ 'organizationId' , organization . id ] ) , body => {
268+ const relateId = body [ orgResource . relateKey ]
269+ if ( _ . some ( organization [ orgResource . propertyName ] , [ orgResource . relateKey , relateId ] ) ) {
270+ logger . error ( `Can't create existing ${ esResourceName } with the ${ orgResource . relateKey } : ${ relateId } , organizationId: ${ body . organizationId } ` )
271+ } else {
272+ updated = true
273+ organization [ orgResource . propertyName ] . push ( body )
274+ }
275+ } )
276+ return updated
277+ } )
241278
242- if ( _ . some ( organization [ orgResource . propertyName ] , [ orgResource . relateKey , relateId ] ) ) {
243- logger . error ( `Can't create existing ${ esResourceName } with the ${ orgResource . relateKey } : ${ relateId } , organizationId: ${ body . organizationId } ` )
244- } else {
245- organization [ orgResource . propertyName ] . push ( body )
246- await client . index ( {
279+ const chunked = _ . chunk ( organizations , config . get ( 'ES.MAX_BULK_SIZE' ) )
280+ for ( const os of chunked ) {
281+ const body = _ . flatMap ( os , doc => [ { index : { _id : doc . id } } , doc ] )
282+ await client . bulk ( {
247283 index : topResources . organization . index ,
248284 type : topResources . organization . type ,
249- id : body . organizationId ,
250- body : organization ,
285+ body,
251286 refresh : 'wait_for'
252287 } )
253288 }
@@ -384,8 +419,8 @@ async function main () {
384419 if ( ! _ . isString ( data [ i ] . updatedBy ) ) {
385420 data [ i ] . updatedBy = 'tcAdmin'
386421 }
387- await insertIntoES ( key , data [ i ] )
388422 }
423+ await insertIntoES ( key , data )
389424 logger . info ( 'import data for ' + key + ' done' )
390425 } catch ( e ) {
391426 logger . error ( e )
0 commit comments