Skip to content

Commit 7b9d4e8

Browse files
committed
Incremental migration
1 parent a3bd84a commit 7b9d4e8

File tree

5 files changed

+386
-49
lines changed

5 files changed

+386
-49
lines changed

data-migration/README.md

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ This tool provides a robust framework for migrating data from JSON files to a Po
2323
- **Detailed Logging**: Configurable logging levels
2424
- **Migration Statistics**: Comprehensive reporting of migration results
2525
- **Validation Testing**: Verify data integrity after migration
26+
- **Incremental Updates**: Date-filtered migrations with selective field updates for efficient data synchronization
2627

2728
## Project Structure
2829

@@ -99,39 +100,88 @@ npx prisma migrate dev
99100
```
100101

101102
### Running the Migration
102-
```
103+
```bash
104+
# Run full migration (default)
103105
npm run migrate
104106

107+
# Run incremental migration with date filter
108+
MIGRATION_MODE=incremental INCREMENTAL_SINCE_DATE=2024-01-15T00:00:00Z npm run migrate
109+
110+
# Run incremental migration with selective field updates
111+
MIGRATION_MODE=incremental INCREMENTAL_SINCE_DATE=2024-01-15T00:00:00Z INCREMENTAL_FIELDS=status,updatedAt npm run migrate
112+
```
113+
114+
For more details on incremental migrations, see the `Incremental Updates` section below.
115+
116+
```bash
105117
# Additional commands
106118
npm run migrate:reset # Reset the db and run the migration tool
107119
```
120+
108121
### Configuration Options
109-
You can configure the migration behavior through environment variables:
122+
123+
The migration tool is configurable through environment variables. You can set these in your `.env` file or pass them directly on the command line.
124+
125+
**Database Configuration**
110126
```
111-
# Database connection
112127
DATABASE_URL=postgresql://username:password@localhost:5432/database_name
128+
```
113129

114-
# Migration settings
130+
**Migration Settings**
131+
```
115132
DATA_DIRECTORY=./data
116133
BATCH_SIZE=100
117134
CONCURRENCY_LIMIT=10
118135
LOG_LEVEL=info
136+
```
119137

120-
# Migration behavior
138+
**Migration Behavior**
139+
```
121140
SKIP_MISSING_REQUIRED=false
122141
USE_TRANSACTIONS=true
123142
CHALLENGE_COUNTERS_ONLY=false
143+
MIGRATION_MODE=full
144+
INCREMENTAL_SINCE_DATE=
145+
INCREMENTAL_FIELDS=
124146
MIGRATORS_ONLY=
147+
```
125148

126-
# Migration attribution
149+
**Migration Attribution**
150+
```
127151
CREATED_BY=migration
128152
UPDATED_BY=migration
129153
```
130-
`SKIP_MISSING_REQUIRED` skips the record if required fields are missing. When `false`, default values for required fields must be configured in `src/config.js`
131-
Logfiles are by default stored in `logs/migration.log`
132-
It can be configured using the env variable `LOG_FILE`
133-
Log levels(increasing level of information): `error`, `warn`, `info`, `debug`
134-
Further migration configuration can also be done in `src/config.js`
154+
155+
`SKIP_MISSING_REQUIRED` skips the record if required fields are missing. When `false`, default values for required fields must be configured in `src/config.js`.
156+
`MIGRATION_MODE` controls the migration strategy. Set to `full` for complete data loads or `incremental` for date-filtered updates. Defaults to `full`. See the `Incremental Updates` section below for detailed usage.
157+
`INCREMENTAL_SINCE_DATE` specifies the cutoff date for incremental migrations (ISO 8601 format, e.g., `2024-01-15T00:00:00Z`). Only records with `updatedAt` or `updated` fields after this date are processed. Required when `MIGRATION_MODE=incremental`.
158+
`INCREMENTAL_FIELDS` is an optional comma-separated list of field names (e.g., `status,updatedAt,name`) that restricts which fields are updated during incremental migrations. When omitted, all fields are updated. The fields `updatedAt` and `updatedBy` are always included. Useful for targeted updates like status changes or counter refreshes.
159+
Logfiles are by default stored in `logs/migration.log`.
160+
You can set a custom location with the `LOG_FILE` environment variable.
161+
Log levels (in increasing verbosity): `error`, `warn`, `info`, `debug`.
162+
Further migration configuration can also be done in `src/config.js`.
163+
164+
## Incremental Updates
165+
166+
Incremental updates let you run a full migration once and then keep the database in sync with smaller, targeted refreshes. After the initial load, you can filter subsequent runs to only process records changed after a specific date. The migrators handle both updates to existing rows and insertion of new records while leaving untouched data in place. This will help cut down on the time needed to migrate the data on the final cutover date.
167+
168+
### How It Works
169+
1. **Date filtering**: Only records with `updatedAt` or `updated` values later than `INCREMENTAL_SINCE_DATE` are loaded into memory.
170+
2. **Selective field updates**: When `INCREMENTAL_FIELDS` is set, only those fields are updated on matching database records; otherwise, all fields are considered for updates.
171+
3. **Upsert behavior**: New records are inserted in full, while existing records receive partial or full updates based on your configuration.
172+
173+
All standard validation rules, dependency checks, and relational guarantees remain in effect while running in incremental mode.
174+
175+
### Configuration
176+
- `MIGRATION_MODE`: Set to `incremental` to enable this workflow (defaults to `full`).
177+
- `INCREMENTAL_SINCE_DATE`: ISO 8601 timestamp that defines the cutoff date (e.g., `2024-01-15T00:00:00Z`). Only records updated after this value are processed.
178+
- `INCREMENTAL_FIELDS`: Optional comma-separated list to limit which fields are updated (e.g., `status,updatedAt,name`). When omitted, all fields are updated; when set, the tool automatically includes `updatedAt` and `updatedBy`.
179+
180+
### Limitations
181+
- Records without `updatedAt` or `updated` fields will be skipped in incremental mode (a warning is logged).
182+
- The `INCREMENTAL_FIELDS` configuration applies globally to all migrators; model-specific field lists are not currently supported.
183+
- Deleted records in the source data are not removed from the database; incremental mode only handles updates and inserts.
184+
- Dependency validation still requires related records to exist; ensure dependent models are included in the incremental run.
135185

136186
### Updating Challenge Counters Only
137187

data-migration/src/config.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ const parseListEnv = value => {
77
return list.length ? list : null;
88
};
99

10+
const ALLOWED_MIGRATION_MODES = new Set(['full', 'incremental']);
11+
const parseMigrationMode = value => {
12+
if (!value) return 'full';
13+
const normalized = String(value).toLowerCase();
14+
return ALLOWED_MIGRATION_MODES.has(normalized) ? normalized : 'full';
15+
};
16+
1017
// Default configuration with fallbacks
1118
module.exports = {
1219
// Database connection
@@ -21,6 +28,11 @@ module.exports = {
2128
// Migration behavior
2229
SKIP_MISSING_REQUIRED: process.env.SKIP_MISSING_REQUIRED === 'true',
2330
USE_TRANSACTIONS: process.env.USE_TRANSACTIONS !== 'false',
31+
32+
// Incremental migration settings
33+
MIGRATION_MODE: parseMigrationMode(process.env.MIGRATION_MODE),
34+
INCREMENTAL_SINCE_DATE: process.env.INCREMENTAL_SINCE_DATE || null,
35+
INCREMENTAL_FIELDS: parseListEnv(process.env.INCREMENTAL_FIELDS),
2436

2537
// Migration attribution
2638
CREATED_BY: process.env.CREATED_BY || 'migration',
@@ -409,3 +421,8 @@ module.exports = {
409421
},
410422
},
411423
};
424+
425+
Object.defineProperty(module.exports, 'parseMigrationMode', {
426+
value: parseMigrationMode,
427+
enumerable: false
428+
});

data-migration/src/migrationManager.js

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,43 @@
11
const { PrismaClient, Prisma } = require('@prisma/client');
2-
const config = require('./config');
2+
const configModule = require('./config');
33
const fs = require('fs');
44
const path = require('path');
55

6+
const { parseMigrationMode } = configModule;
7+
const baseConfig = { ...configModule };
8+
9+
if (typeof parseMigrationMode === 'function') {
10+
delete baseConfig.parseMigrationMode;
11+
}
12+
613
/**
714
* Core migration manager that orchestrates the migration process
815
*/
916
class MigrationManager {
1017
constructor(userConfig = {}) {
18+
const overrideConfig = { ...userConfig };
19+
let pendingMigrationModeWarning = null;
20+
21+
if (Object.prototype.hasOwnProperty.call(overrideConfig, 'MIGRATION_MODE') && typeof parseMigrationMode === 'function') {
22+
const rawMigrationMode = overrideConfig.MIGRATION_MODE;
23+
const normalizedMode = parseMigrationMode(rawMigrationMode);
24+
overrideConfig.MIGRATION_MODE = normalizedMode;
25+
26+
const isNullishOverride = rawMigrationMode === undefined || rawMigrationMode === null;
27+
const matchesAllowedString = typeof rawMigrationMode === 'string' && rawMigrationMode.toLowerCase() === normalizedMode;
28+
29+
if (!isNullishOverride && !matchesAllowedString) {
30+
pendingMigrationModeWarning = rawMigrationMode;
31+
}
32+
}
33+
1134
// Merge default config with user-provided config
1235
this.config = {
13-
...config,
14-
...userConfig,
36+
...baseConfig,
37+
...overrideConfig,
1538
defaultValues: {
16-
...config.defaultValues,
17-
...(userConfig.defaultValues || {})
39+
...baseConfig.defaultValues,
40+
...(overrideConfig.defaultValues || {})
1841
}
1942
};
2043

@@ -29,6 +52,20 @@ class MigrationManager {
2952
this.dependencies = {};
3053
// this.logger = this.createLogger(this.config.LOG_LEVEL);
3154
this.logger = this.createLogger(this.config.LOG_LEVEL, this.config.LOG_FILE || 'migration.log');
55+
if (pendingMigrationModeWarning) {
56+
this.logger.warn(`Invalid MIGRATION_MODE "${pendingMigrationModeWarning}" provided. Defaulting to 'full'.`);
57+
}
58+
59+
if (this.config.MIGRATION_MODE === 'incremental') {
60+
const sinceDate = this.config.INCREMENTAL_SINCE_DATE;
61+
62+
if (!sinceDate) {
63+
this.logger.warn('Incremental migration mode is set but no INCREMENTAL_SINCE_DATE is configured.');
64+
} else if (Number.isNaN(Date.parse(sinceDate))) {
65+
this.logger.warn(`INCREMENTAL_SINCE_DATE "${sinceDate}" is not a valid ISO date string and will be ignored by subsequent phases.`);
66+
this.config.INCREMENTAL_SINCE_DATE = null;
67+
}
68+
}
3269
this.migrators = [];
3370
}
3471

@@ -112,6 +149,13 @@ class MigrationManager {
112149
return this.dependencies[dependencyModel]?.has(id) || false;
113150
}
114151

152+
/**
153+
* Determine if the migration is running in incremental mode
154+
*/
155+
isIncrementalMode() {
156+
return this.config.MIGRATION_MODE === 'incremental';
157+
}
158+
115159
/**
116160
* Stores nested data for a specific model
117161
* @param {String} modelName
@@ -303,4 +347,4 @@ class MigrationManager {
303347
}
304348
}
305349

306-
module.exports = { MigrationManager };
350+
module.exports = { MigrationManager };

data-migration/src/migrators/_baseMigrator.js

Lines changed: 100 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,22 @@ class BaseMigrator {
3636
/**
3737
* Load data for this migrator
3838
* @returns {Array} The loaded data
39-
*/
39+
*/
4040
async loadData() {
41+
const isIncremental = this.manager.isIncrementalMode();
42+
const sinceDate = this.manager.config.INCREMENTAL_SINCE_DATE ?? null;
43+
44+
if (isIncremental) {
45+
this.manager.logger.debug(`Loading ${this.modelName} data with incremental filter since ${sinceDate || 'unspecified date'}`);
46+
return await loadData(
47+
this.manager.config.DATA_DIRECTORY,
48+
this.getFileName(),
49+
this.isElasticsearch,
50+
sinceDate
51+
);
52+
}
53+
54+
this.manager.logger.debug(`Loading ${this.modelName} data without incremental filtering`);
4155
return await loadData(this.manager.config.DATA_DIRECTORY, this.getFileName(), this.isElasticsearch);
4256
}
4357

@@ -47,15 +61,31 @@ class BaseMigrator {
4761
*/
4862
async migrate() {
4963
this.manager.logger.info(`Migrating ${this.modelName} data...`);
64+
const isIncremental = this.manager.isIncrementalMode();
65+
const sinceDate = this.manager.config.INCREMENTAL_SINCE_DATE ?? 'unspecified date';
66+
const incrementalFields = Array.isArray(this.manager.config.INCREMENTAL_FIELDS)
67+
? this.manager.config.INCREMENTAL_FIELDS
68+
: [];
69+
70+
if (isIncremental) {
71+
this.manager.logger.info(`Running in INCREMENTAL mode since ${sinceDate}`);
72+
if (incrementalFields.length) {
73+
this.manager.logger.info(`Updating only fields: ${incrementalFields.join(', ')}`);
74+
}
75+
} else {
76+
this.manager.logger.info('Running in FULL migration mode');
77+
}
78+
5079
const data = await this.loadData();
5180

5281
// Allow subclasses to perform pre-processing
5382
const processedData = await this.beforeMigration(data);
5483

5584
const processFn = this.createProcessFunction();
5685
const result = await this.manager.processBatch(processedData, processFn);
57-
58-
this.manager.logger.info(`Migrated ${result.processed} ${this.modelName} records (skipped ${result.skipped})`);
86+
87+
const modeLabel = isIncremental ? 'incremental' : 'full';
88+
this.manager.logger.info(`Migrated ${result.processed} ${this.modelName} records (skipped ${result.skipped}) in ${modeLabel} mode`);
5989

6090
// Allow subclasses to perform post-processing
6191
await this.afterMigration(result);
@@ -93,10 +123,16 @@ class BaseMigrator {
93123
return async (batch, prisma, uniqueTracker) => {
94124
let processed = 0;
95125
let skipped = 0;
96-
126+
97127
// Initialize unique trackers if needed
98128
this.initializeUniqueTrackers(uniqueTracker);
99-
129+
130+
if (this.manager.isIncrementalMode()) {
131+
this.manager.logger.debug(`Processing ${this.modelName} batch in incremental mode`);
132+
} else {
133+
this.manager.logger.debug(`Processing ${this.modelName} batch in full migration mode`);
134+
}
135+
100136
for (const _record of batch) {
101137

102138
const record = this.beforeValidation(_record);
@@ -137,7 +173,9 @@ class BaseMigrator {
137173
const finalModelData = this.customizeRecordData(modelData);
138174

139175
// Create upsert data
140-
const upsertData = this.createUpsertData(finalModelData, this.getIdField());
176+
const upsertData = this.manager.isIncrementalMode()
177+
? this.createIncrementalUpsertData(finalModelData, this.getIdField())
178+
: this.createUpsertData(finalModelData, this.getIdField());
141179

142180
// Allow subclasses to modify upsert data if needed
143181
const finalUpsertData = this.customizeUpsertData(upsertData, record);
@@ -243,6 +281,61 @@ class BaseMigrator {
243281
};
244282
}
245283

284+
/**
285+
* Create upsert data when running in incremental mode. Only configured incremental fields
286+
* are updated while new records still receive the full dataset.
287+
* @param {Object} record The processed record data
288+
* @param {string} idField The ID field name
289+
* @returns {{ where: Object, update: Object, create: Object }} The incremental upsert payload
290+
*/
291+
createIncrementalUpsertData(record, idField) {
292+
const incrementalFields = Array.isArray(this.manager.config.INCREMENTAL_FIELDS)
293+
? this.manager.config.INCREMENTAL_FIELDS
294+
: [];
295+
296+
if (!incrementalFields.length) {
297+
return this.createUpsertData(record, idField);
298+
}
299+
300+
const updateData = {};
301+
const missingFields = [];
302+
303+
for (const field of incrementalFields) {
304+
if (field === idField) {
305+
continue;
306+
}
307+
308+
if (record[field] !== undefined) {
309+
updateData[field] = record[field];
310+
} else {
311+
updateData[field] = Prisma.skip;
312+
missingFields.push(field);
313+
}
314+
}
315+
316+
updateData.updatedAt = record.updatedAt ? new Date(record.updatedAt) : new Date();
317+
updateData.updatedBy = record.updatedBy;
318+
319+
if (missingFields.length) {
320+
this._missingIncrementalFieldWarnings = this._missingIncrementalFieldWarnings || new Set();
321+
for (const field of missingFields) {
322+
if (!this._missingIncrementalFieldWarnings.has(field)) {
323+
this._missingIncrementalFieldWarnings.add(field);
324+
this.manager.logger.warn(`Configured incremental field "${field}" is missing on some ${this.modelName} records; skipping updates for this field`);
325+
}
326+
}
327+
}
328+
329+
const createData = { ...record };
330+
createData.createdAt = record.createdAt ? new Date(record.createdAt) : new Date();
331+
332+
return {
333+
where: { [idField]: record[idField] },
334+
update: updateData,
335+
create: createData
336+
};
337+
}
338+
246339
/**
247340
* Initialize unique trackers for this model
248341
*/
@@ -384,4 +477,4 @@ class BaseMigrator {
384477
}
385478
}
386479

387-
module.exports = { BaseMigrator };
480+
module.exports = { BaseMigrator };

0 commit comments

Comments
 (0)