@@ -64,6 +64,7 @@ interface CliOptions {
6464const cliOptions = parseCliOptions ( process . argv . slice ( 2 ) ) ;
6565const CHANGE_WINDOW_START = cliOptions . mode === 'delta' ? cliOptions . since : null ;
6666const deltaTablesLogged = new Set < string > ( ) ;
67+ const deltaTableStats = new Map < string , { fields : string [ ] ; fetches : number ; matched : number } > ( ) ;
6768
6869if ( cliOptions . mode === 'delta' ) {
6970 console . log (
@@ -81,14 +82,20 @@ function applyChangeWindow<T extends Record<string, any>>(
8182 fields : string [ ] ,
8283 label : string
8384) : T {
85+ if ( ! deltaTableStats . has ( label ) ) {
86+ deltaTableStats . set ( label , { fields, fetches : 0 , matched : 0 } ) ;
87+ } else if ( fields . length && deltaTableStats . get ( label ) ! . fields . length === 0 ) {
88+ deltaTableStats . get ( label ) ! . fields = fields ;
89+ }
90+
8491 if ( ! CHANGE_WINDOW_START ) {
8592 return args ;
8693 }
8794
8895 if ( fields . length === 0 ) {
8996 if ( ! deltaTablesLogged . has ( label ) ) {
9097 deltaTablesLogged . add ( label ) ;
91- console . log ( `[delta] ${ label } : no timestamp fields; exporting full set` ) ;
98+ console . log ( `[delta] ${ label } : no timestamp fields; exporting full set (delta filter skipped) ` ) ;
9299 }
93100 return args ;
94101 }
@@ -102,6 +109,7 @@ function applyChangeWindow<T extends Record<string, any>>(
102109 console . log (
103110 `[delta] ${ label } : filtering rows where ${ fields . join ( ' OR ' ) } ≥ ${ CHANGE_WINDOW_START . toISOString ( ) } `
104111 ) ;
112+ console . log ( `[delta] ${ label } : resolved where clause ${ JSON . stringify ( deltaWhere ) } ` ) ;
105113 }
106114
107115 const baseArgs : any = { ...args } ;
@@ -113,6 +121,66 @@ function applyChangeWindow<T extends Record<string, any>>(
113121 return baseArgs ;
114122}
115123
124+ function recordDeltaFetch ( label : string , count : number ) {
125+ if ( ! deltaTableStats . has ( label ) ) {
126+ deltaTableStats . set ( label , { fields : [ ] , fetches : 0 , matched : 0 } ) ;
127+ }
128+ const stats = deltaTableStats . get ( label ) ! ;
129+ stats . fetches += 1 ;
130+ stats . matched += count ;
131+ if ( cliOptions . mode === 'delta' && count === 0 && stats . fetches === 1 ) {
132+ console . warn ( `[delta] ${ label } : first fetch returned 0 rows during delta migration.` ) ;
133+ }
134+ }
135+
136+ function resolvePrismaDelegate ( label : string ) {
137+ if ( ! label ) {
138+ return null ;
139+ }
140+ const [ clientKey , ...modelParts ] = label . split ( '.' ) ;
141+ const modelName = modelParts . join ( '.' ) ;
142+ const client = clientKey === 'target' ? target : clientKey === 'sourceAuth' ? sourceAuth : clientKey === 'sourceIdentity' ? sourceIdentity : null ;
143+ if ( ! client || ! modelName ) {
144+ return null ;
145+ }
146+ const delegate = ( client as any ) [ modelName ] ;
147+ if ( ! delegate || typeof delegate . count !== 'function' ) {
148+ return null ;
149+ }
150+ return delegate ;
151+ }
152+
153+ function buildDeltaWhereClause ( fields : string [ ] ) {
154+ if ( ! CHANGE_WINDOW_START || ! fields || fields . length === 0 ) {
155+ return undefined ;
156+ }
157+ return {
158+ OR : fields . map ( ( field ) => ( { [ field ] : { gte : CHANGE_WINDOW_START } } ) )
159+ } ;
160+ }
161+
162+ function logDeltaTableSummary ( ) {
163+ if ( deltaTableStats . size === 0 ) {
164+ if ( cliOptions . mode === 'delta' ) {
165+ console . log ( '[delta] No delta table statistics were collected.' ) ;
166+ }
167+ return ;
168+ }
169+
170+ console . log ( '[delta] Table fetch summary:' ) ;
171+ deltaTableStats . forEach ( ( stats , label ) => {
172+ const fieldsSummary = stats . fields . length ? stats . fields . join ( ', ' ) : 'n/a' ;
173+ console . log ( `[delta] ${ label } : matched=${ stats . matched } , fetches=${ stats . fetches } , fields=${ fieldsSummary } ` ) ;
174+ if ( cliOptions . mode === 'delta' ) {
175+ if ( stats . matched === 0 ) {
176+ console . warn ( `[delta] ${ label } : matched 0 records; verify incremental filters for this table.` ) ;
177+ } else if ( stats . matched > 100_000 ) {
178+ console . warn ( `[delta] ${ label } : processed ${ stats . matched } records during delta run; confirm the since-date filter is correct.` ) ;
179+ }
180+ }
181+ } ) ;
182+ }
183+
116184function parseCliOptions ( argv : string [ ] ) : CliOptions {
117185 let mode : RunMode | null = null ;
118186 let sinceRaw : string | null = process . env . MIGRATE_SINCE ?? null ;
@@ -263,6 +331,7 @@ async function migrateRoles() {
263331 'sourceAuth.role'
264332 )
265333 ) ;
334+ recordDeltaFetch ( 'sourceAuth.role' , batch . length ) ;
266335 if ( batch . length === 0 ) break ;
267336
268337 await target . $transaction (
@@ -304,6 +373,7 @@ async function migrateClients() {
304373 'sourceAuth.client'
305374 )
306375 ) ;
376+ recordDeltaFetch ( 'sourceAuth.client' , batch . length ) ;
307377 if ( batch . length === 0 ) break ;
308378
309379 await target . $transaction (
@@ -350,6 +420,7 @@ async function migrateRoleAssignments() {
350420 'sourceAuth.roleAssignment'
351421 )
352422 ) ;
423+ recordDeltaFetch ( 'sourceAuth.roleAssignment' , batch . length ) ;
353424 if ( batch . length === 0 ) break ;
354425
355426 await target . $transaction (
@@ -406,6 +477,7 @@ async function migrateAchievementTypeLu() {
406477 'sourceIdentity.achievement_type_lu'
407478 )
408479 ) ;
480+ recordDeltaFetch ( 'sourceIdentity.achievement_type_lu' , batch . length ) ;
409481 if ( batch . length === 0 ) break ;
410482
411483 await target . $transaction (
@@ -445,6 +517,7 @@ async function migrateCountry() {
445517 'sourceIdentity.country'
446518 )
447519 ) ;
520+ recordDeltaFetch ( 'sourceIdentity.country' , batch . length ) ;
448521 if ( batch . length === 0 ) break ;
449522
450523 await target . $transaction (
@@ -502,6 +575,7 @@ async function migrateEmailStatusLu() {
502575 'sourceIdentity.email_status_lu'
503576 )
504577 ) ;
578+ recordDeltaFetch ( 'sourceIdentity.email_status_lu' , batch . length ) ;
505579 if ( batch . length === 0 ) break ;
506580
507581 await target . $transaction (
@@ -544,6 +618,7 @@ async function migrateEmailTypeLu() {
544618 'sourceIdentity.email_type_lu'
545619 )
546620 ) ;
621+ recordDeltaFetch ( 'sourceIdentity.email_type_lu' , batch . length ) ;
547622 if ( batch . length === 0 ) break ;
548623
549624 await target . $transaction (
@@ -586,6 +661,7 @@ async function migrateInvalidHandles() {
586661 'sourceIdentity.invalid_handles'
587662 )
588663 ) ;
664+ recordDeltaFetch ( 'sourceIdentity.invalid_handles' , batch . length ) ;
589665 if ( batch . length === 0 ) break ;
590666
591667 await target . $transaction (
@@ -625,6 +701,7 @@ async function migrateSecurityStatusLu() {
625701 'sourceIdentity.security_status_lu'
626702 )
627703 ) ;
704+ recordDeltaFetch ( 'sourceIdentity.security_status_lu' , batch . length ) ;
628705 if ( batch . length === 0 ) break ;
629706
630707 await target . $transaction (
@@ -664,6 +741,7 @@ async function migrateSecurityGroups() {
664741 'sourceIdentity.security_groups'
665742 )
666743 ) ;
744+ recordDeltaFetch ( 'sourceIdentity.security_groups' , batch . length ) ;
667745 if ( batch . length === 0 ) break ;
668746
669747 await target . $transaction (
@@ -1458,13 +1536,148 @@ async function migrateUserStatus() {
14581536
14591537// ===== Main =====
14601538
1539+ async function validateDeltaMigration ( ) {
1540+ console . log ( '[delta] Performing preflight validation…' ) ;
1541+ if ( cliOptions . mode !== 'delta' ) {
1542+ console . log ( '[delta] Run mode is full; delta validation skipped.' ) ;
1543+ return ;
1544+ }
1545+
1546+ if ( ! CHANGE_WINDOW_START ) {
1547+ throw new Error ( '[delta] Delta mode requires a since-date; none was resolved.' ) ;
1548+ }
1549+
1550+ const now = new Date ( ) ;
1551+ if ( CHANGE_WINDOW_START > now ) {
1552+ throw new Error ( `[delta] Since-date ${ CHANGE_WINDOW_START . toISOString ( ) } is in the future.` ) ;
1553+ }
1554+
1555+ const windowHours = ( now . getTime ( ) - CHANGE_WINDOW_START . getTime ( ) ) / 36e5 ;
1556+ if ( windowHours > 24 * 30 ) {
1557+ const windowDays = Math . round ( windowHours / 24 ) ;
1558+ console . warn ( `[delta] Since-date is ${ windowDays } days old; ensure this wide window is intentional.` ) ;
1559+ }
1560+
1561+ const connectionChecks = [
1562+ { label : 'target' , client : target } ,
1563+ { label : 'sourceAuth' , client : sourceAuth } ,
1564+ { label : 'sourceIdentity' , client : sourceIdentity }
1565+ ] ;
1566+
1567+ for ( const { label, client } of connectionChecks ) {
1568+ try {
1569+ await client . $queryRawUnsafe ( 'SELECT 1' ) ;
1570+ console . log ( `[delta] Connectivity check OK: ${ label } ` ) ;
1571+ } catch ( err : any ) {
1572+ throw new Error ( `[delta] Connectivity check failed for ${ label } : ${ err . message } ` ) ;
1573+ }
1574+ }
1575+
1576+ console . log ( `[delta] Change window start: ${ CHANGE_WINDOW_START . toISOString ( ) } ` ) ;
1577+ console . log ( `[delta] Parallel worker limit: ${ PARALLEL_LIMIT } ` ) ;
1578+ }
1579+
1580+ async function performReferentialIntegrityChecks ( ) {
1581+ if ( cliOptions . mode !== 'delta' ) {
1582+ return ;
1583+ }
1584+
1585+ console . log ( '[delta] Validating basic referential integrity on target…' ) ;
1586+ const checks : Array < { label : string ; query : ( ) => Promise < number > } > = [
1587+ {
1588+ label : 'roleAssignment.role' ,
1589+ query : ( ) => target . roleAssignment . count ( { where : { role : { is : null } } } )
1590+ } ,
1591+ {
1592+ label : 'userEmail.user' ,
1593+ query : ( ) => target . userEmail . count ( { where : { user : { is : null } } } )
1594+ } ,
1595+ {
1596+ label : 'userStatus.user' ,
1597+ query : ( ) => target . userStatus . count ( { where : { user : { is : null } } } )
1598+ }
1599+ ] ;
1600+
1601+ for ( const check of checks ) {
1602+ try {
1603+ const orphans = await check . query ( ) ;
1604+ if ( orphans > 0 ) {
1605+ console . warn ( `[delta] Referential check failed for ${ check . label } : ${ orphans } orphan record(s).` ) ;
1606+ } else {
1607+ console . log ( `[delta] Referential check passed for ${ check . label } .` ) ;
1608+ }
1609+ } catch ( err : any ) {
1610+ console . warn ( `[delta] Referential check errored for ${ check . label } : ${ err . message } ` ) ;
1611+ }
1612+ }
1613+ }
1614+
1615+ async function verifyDeltaMigration ( ) {
1616+ if ( cliOptions . mode !== 'delta' ) {
1617+ logDeltaTableSummary ( ) ;
1618+ return ;
1619+ }
1620+
1621+ console . log ( '[delta] Verifying delta migration results…' ) ;
1622+ const discrepancies : Array < { label : string ; source : number ; target : number } > = [ ] ;
1623+
1624+ for ( const [ label , stats ] of deltaTableStats . entries ( ) ) {
1625+ if ( ! stats . fields . length ) {
1626+ console . log ( `[delta] ${ label } : no delta fields registered; skipping count comparison.` ) ;
1627+ continue ;
1628+ }
1629+
1630+ const sourceDelegate = resolvePrismaDelegate ( label ) ;
1631+ const targetLabel = label . startsWith ( 'sourceAuth.' ) || label . startsWith ( 'sourceIdentity.' )
1632+ ? `target.${ label . split ( '.' ) . slice ( 1 ) . join ( '.' ) } `
1633+ : null ;
1634+ if ( ! sourceDelegate || ! targetLabel ) {
1635+ console . warn ( `[delta] ${ label } : unable to resolve source delegate; skipping verification.` ) ;
1636+ continue ;
1637+ }
1638+
1639+ const targetDelegate = resolvePrismaDelegate ( targetLabel ) ;
1640+ if ( ! targetDelegate ) {
1641+ console . warn ( `[delta] ${ label } : unable to resolve target delegate (${ targetLabel } ); skipping.` ) ;
1642+ continue ;
1643+ }
1644+
1645+ const whereClause = buildDeltaWhereClause ( stats . fields ) ;
1646+ try {
1647+ const [ sourceCount , targetCount ] = await Promise . all ( [
1648+ sourceDelegate . count ( { where : whereClause } ) ,
1649+ targetDelegate . count ( { where : whereClause } )
1650+ ] ) ;
1651+ if ( sourceCount !== targetCount ) {
1652+ discrepancies . push ( { label, source : sourceCount , target : targetCount } ) ;
1653+ console . warn ( `[delta] ${ label } : count mismatch (source=${ sourceCount } , target=${ targetCount } ).` ) ;
1654+ } else {
1655+ console . log ( `[delta] ${ label } : counts OK (${ sourceCount } ).` ) ;
1656+ }
1657+ } catch ( err : any ) {
1658+ console . warn ( `[delta] ${ label } : unable to verify counts (${ err . message } ).` ) ;
1659+ }
1660+ }
1661+
1662+ await performReferentialIntegrityChecks ( ) ;
1663+ logDeltaTableSummary ( ) ;
1664+
1665+ if ( discrepancies . length ) {
1666+ console . warn ( `[delta] Count verification identified ${ discrepancies . length } table(s) with mismatches.` ) ;
1667+ } else {
1668+ console . log ( '[delta] Source and target counts match for monitored tables.' ) ;
1669+ }
1670+ }
1671+
14611672async function main ( ) {
14621673 const descriptor =
14631674 cliOptions . mode === 'delta'
14641675 ? `mode=delta, since=${ CHANGE_WINDOW_START ! . toISOString ( ) } `
14651676 : 'mode=full' ;
14661677 console . log ( `Starting migration… (${ descriptor } )` ) ;
14671678
1679+ await validateDeltaMigration ( ) ;
1680+
14681681 // // 1) Auth (MySQL) → target
14691682 await runParallel ( 'auth (roles + clients)' , [ migrateRoles , migrateClients ] ) ;
14701683 await migrateRoleAssignments ( ) ;
@@ -1499,6 +1712,7 @@ async function main() {
14991712 migrateUserStatus ,
15001713 ] ) ;
15011714
1715+ await verifyDeltaMigration ( ) ;
15021716 console . log ( '✓ Migration complete.' ) ;
15031717}
15041718
0 commit comments