From cd271c64bd252c03f579fda76374731437131663 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 31 Oct 2025 13:53:54 +0100 Subject: [PATCH 01/14] feat: implement rawSql and delete in tinybirdClient --- services/libs/database/src/tinybirdClient.ts | 94 +++++++++++++++++--- 1 file changed, 84 insertions(+), 10 deletions(-) diff --git a/services/libs/database/src/tinybirdClient.ts b/services/libs/database/src/tinybirdClient.ts index 575274c024..aa64562f50 100644 --- a/services/libs/database/src/tinybirdClient.ts +++ b/services/libs/database/src/tinybirdClient.ts @@ -47,6 +47,24 @@ export class TinybirdClient { this.token = process.env.CROWD_TINYBIRD_ACTIVITIES_TOKEN ?? '' } + /** + * Get common headers for Tinybird API requests + * @param contentType - Optional Content-Type header value (default: undefined) + * @returns Headers object for axios requests + */ + private getHeaders(contentType?: string): Record { + const headers: Record = { + Authorization: `Bearer ${this.token}`, + Accept: 'application/json', + } + + if (contentType) { + headers['Content-Type'] = contentType + } + + return headers + } + /** * Call a Tinybird pipe in JSON format. */ @@ -69,10 +87,7 @@ export class TinybirdClient { }` const result = await axios.get(url, { - headers: { - Authorization: `Bearer ${this.token}`, - Accept: 'application/json', - }, + headers: this.getHeaders(), httpsAgent: TinybirdClient.httpsAgent, }) @@ -116,15 +131,74 @@ export class TinybirdClient { } const result = await axios.post(url, body, { - headers: { - Authorization: `Bearer ${this.token}`, - 'Content-Type': 'application/json', - Accept: 'application/json', - }, + headers: this.getHeaders('application/json'), responseType: 'json', httpsAgent: TinybirdClient.httpsAgent, }) return result.data } -} + + /** + * Execute raw SQL query on Tinybird + * Useful for queries that don't go through named pipes (e.g., ALTER TABLE, direct SELECT) + */ + async rawSql(query: string): Promise { + const url = `${this.host}/v0/sql` + + const result = await axios.post( + url, + { q: query }, + { + headers: this.getHeaders('application/json'), + responseType: 'json', + httpsAgent: TinybirdClient.httpsAgent, + }, + ) + + return result.data + } + + /** + * Delete data from a Tinybird datasource using the delete API + * See: https://www.tinybird.co/docs/classic/get-data-in/data-operations/replace-and-delete-data#delete-data-selectively + * + * @param datasourceName - Name of the datasource to delete from + * @param deleteCondition - SQL expression filter (e.g., "repoId = 'xxx'", "id IN ('a', 'b')") + * @returns Job response with job_id and job_url for tracking deletion progress + */ + async deleteDatasource( + datasourceName: string, + deleteCondition: string, + ): Promise<{ + id: string + job_id: string + job_url: string + status: string + job: { + kind: string + id: string + job_id: string + status: string + datasource: { + id: string + name: string + } + delete_condition: string + } + }> { + const url = `${this.host}/v0/datasources/${encodeURIComponent(datasourceName)}/delete` + + const result = await axios.post( + url, + { delete_condition: deleteCondition }, + { + headers: this.getHeaders('application/json'), + responseType: 'json', + httpsAgent: TinybirdClient.httpsAgent, + }, + ) + + return result.data + } +} \ No newline at end of file From b4dba3f786c8006c24dc91f6eb4fc3f6e0ec82ed Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 31 Oct 2025 13:55:03 +0100 Subject: [PATCH 02/14] feat: implement script to delete maintainers & activities from forked repos --- pnpm-lock.yaml | 3 + .../apps/script_executor_worker/package.json | 2 + ...cleanup-fork-activities-and-maintainers.ts | 650 ++++++++++++++++++ 3 files changed, 655 insertions(+) create mode 100644 services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 29a2841b25..4630ba2858 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1266,6 +1266,9 @@ importers: '@crowd/redis': specifier: workspace:* version: link:../../libs/redis + '@crowd/snowflake': + specifier: workspace:* + version: link:../../libs/snowflake '@crowd/types': specifier: workspace:* version: link:../../libs/types diff --git a/services/apps/script_executor_worker/package.json b/services/apps/script_executor_worker/package.json index c634aad1e3..c020c48e42 100644 --- a/services/apps/script_executor_worker/package.json +++ b/services/apps/script_executor_worker/package.json @@ -6,6 +6,7 @@ "start:debug": "CROWD_TEMPORAL_TASKQUEUE=script-executor SERVICE=script-executor-worker LOG_LEVEL=info tsx --inspect=0.0.0.0:9232 src/main.ts", "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", "dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug", + "cleanup-fork-activities": "npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts", "lint": "npx eslint --ext .ts src --max-warnings=0", "format": "npx prettier --write \"src/**/*.ts\"", "format-check": "npx prettier --check .", @@ -19,6 +20,7 @@ "@crowd/logging": "workspace:*", "@crowd/opensearch": "workspace:*", "@crowd/redis": "workspace:*", + "@crowd/snowflake": "workspace:*", "@crowd/types": "workspace:*", "@temporalio/client": "~1.11.8", "@temporalio/workflow": "~1.11.8", diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts new file mode 100644 index 0000000000..addc591451 --- /dev/null +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -0,0 +1,650 @@ +/** + * Fork Activities and Maintainers Cleanup Script + * + * PROBLEM: + * When a fork repository is onboarded, all activities from the original project are wrongly + * attributed to the new, forked project. This makes the data inaccurate. + * + * SOLUTION: + * This script deletes all activities and maintainers from forked repositories across: + * - Tinybird + * - Snowflake + * - CDP Postgres + * + * Usage: + * # Via package.json script (recommended): + * pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--skip-snowflake] + * + * # Or directly with tsx: + * npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--skip-snowflake] + * + * Options: + * --dry-run Display what would be deleted without actually deleting anything + * --skip-snowflake Skip all Snowflake operations (useful for testing without valid Snowflake credentials) + * + * Environment Variables Required: + * CROWD_DB_WRITE_HOST - Postgres write host + * CROWD_DB_PORT - Postgres port + * CROWD_DB_USERNAME - Postgres username + * CROWD_DB_PASSWORD - Postgres password + * CROWD_DB_DATABASE - Postgres database name + * CROWD_TINYBIRD_BASE_URL - Tinybird API base URL + * CROWD_TINYBIRD_ACTIVITIES_TOKEN - Tinybird API token + * CROWD_SNOWFLAKE_PRIVATE_KEY - Snowflake private key + * CROWD_SNOWFLAKE_ACCOUNT - Snowflake account + * CROWD_SNOWFLAKE_USERNAME - Snowflake username + * CROWD_SNOWFLAKE_DATABASE - Snowflake database + * CROWD_SNOWFLAKE_WAREHOUSE - Snowflake warehouse + * CROWD_SNOWFLAKE_ROLE - Snowflake role + */ +import { TinybirdClient, WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' +import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { getServiceChildLogger } from '@crowd/logging' +import { SnowflakeClient } from '@crowd/snowflake' + +const log = getServiceChildLogger('cleanup-fork-activities-script') + +// Type definitions +interface ForkRepository { + id: string + url: string + segmentId: string + forkedFrom: string +} + +interface DatabaseClients { + postgres: QueryExecutor + snowflake: SnowflakeClient | null +} + +/** + * Initialize Postgres connection using QueryExecutor + */ +async function initPostgresClient(): Promise { + log.info('Initializing Postgres connection...') + + const dbConnection = await getDbConnection(WRITE_DB_CONFIG()) + const queryExecutor = pgpQx(dbConnection) + + log.info('Postgres connection established') + return queryExecutor +} + +/** + * Initialize Snowflake client + */ +function initSnowflakeClient(): SnowflakeClient { + log.info('Initializing Snowflake client...') + + const client = new SnowflakeClient({ + privateKeyString: process.env['CROWD_SNOWFLAKE_PRIVATE_KEY'], + account: process.env['CROWD_SNOWFLAKE_ACCOUNT'], + username: process.env['CROWD_SNOWFLAKE_USERNAME'], + database: process.env['CROWD_SNOWFLAKE_DATABASE'], + warehouse: process.env['CROWD_SNOWFLAKE_WAREHOUSE'], + role: process.env['CROWD_SNOWFLAKE_ROLE'], + parentLog: log, + }) + + log.info('Snowflake client initialized') + return client +} + +/** + * Initialize all database clients + */ +async function initDatabaseClients(skipSnowflake: boolean = false): Promise { + log.info('Initializing database clients...') + + const postgres = await initPostgresClient() + let snowflake: SnowflakeClient | null = null + + if (skipSnowflake) { + log.info('Skipping Snowflake client initialization (--skip-snowflake flag set)') + } else { + try { + snowflake = initSnowflakeClient() + } catch (error) { + log.warn(`Failed to initialize Snowflake client: ${error.message}`) + log.warn('Continuing without Snowflake support') + snowflake = null + } + } + + log.info('All database clients initialized successfully') + + return { + postgres, + snowflake, + } +} + +/** + * Lookup a single fork repository by URL + */ +async function lookupForkRepository( + postgres: QueryExecutor, + url: string, +): Promise { + log.info(`Looking up repository by URL: ${url}`) + + const query = ` + SELECT + id, + url, + "segmentId", + "forkedFrom" + FROM git.repositories + WHERE url = $(url) + AND "forkedFrom" IS NOT NULL + ` + + const repo = (await postgres.selectOneOrNone(query, { url })) as ForkRepository | null + + if (!repo) { + // Check if repository exists but is not a fork + const checkQuery = ` + SELECT id, url, "forkedFrom" + FROM git.repositories + WHERE url = $(url) + ` + const check = await postgres.selectOneOrNone(checkQuery, { url }) + + if (!check) { + throw new Error(`Repository not found in database: ${url}`) + } else { + throw new Error(`Repository is not a fork (forkedFrom is null): ${url}`) + } + } + + log.info(`✓ Found fork repository: ${repo.url} → ${repo.forkedFrom}`) + return repo +} + +/** + * Delete maintainers for a fork repository from Postgres + */ +async function deleteMaintainersFromPostgres( + postgres: QueryExecutor, + repoId: string, + repoUrl: string, + dryRun: boolean = false, +): Promise { + if (dryRun) { + log.info(`[DRY RUN] Querying maintainers for repository: ${repoUrl}`) + const query = ` + SELECT COUNT(*) as count + FROM maintainersInternal + WHERE "repoId" = $(repoId) + ` + const result = (await postgres.selectOne(query, { repoId })) as { count: string } + const rowCount = parseInt(result.count, 10) + log.info(`[DRY RUN] Would delete ${rowCount} maintainer(s) from Postgres`) + return rowCount + } + + log.info(`Deleting maintainers for repository: ${repoUrl}`) + const query = ` + DELETE FROM maintainersInternal + WHERE "repoId" = $(repoId) + ` + const rowCount = await postgres.result(query, { repoId }) + log.info(`✓ Deleted ${rowCount} maintainer(s) from Postgres for repository ${repoUrl}`) + return rowCount +} + +/** + * Delete maintainers from Tinybird using the delete API + * Note: Tinybird deletions are async and may take time to reflect + * See: https://www.tinybird.co/docs/classic/get-data-in/data-operations/replace-and-delete-data#delete-data-selectively + */ +async function deleteMaintainersFromTinybird( + tinybird: TinybirdClient, + repoId: string, + repoUrl: string, + dryRun: boolean = false, +): Promise { + if (dryRun) { + log.info(`[DRY RUN] Querying maintainers from Tinybird for repository: ${repoUrl}`) + try { + const query = `SELECT COUNT(*) as count FROM maintainersInternal WHERE repoId = '${repoId}' FORMAT JSON` + const result = await tinybird.rawSql<{ data: Array<{ count: string }> }>(query) + const count = result.data.length > 0 ? parseInt(result.data[0].count, 10) : 0 + log.info(`[DRY RUN] Would delete ${count} maintainer(s) from Tinybird`) + return count + } catch (error) { + log.error(`Failed to query maintainers from Tinybird: ${error.message}`) + throw error + } + } + + log.info(`Deleting maintainers from Tinybird for repository: ${repoUrl}`) + + try { + // Delete from maintainersInternal datasource using repoId + log.info('Deleting from maintainersInternal datasource...') + const deleteCondition = `repoId = '${repoId}'` + const jobResponse = await tinybird.deleteDatasource('maintainersInternal', deleteCondition) + + log.info( + `✓ Submitted deletion job to Tinybird (job_id: ${jobResponse.job_id}, job_url: ${jobResponse.job_url})`, + ) + log.info( + ` Note: Deletions are async and may take time to complete. Check job status at: ${jobResponse.job_url}`, + ) + return 0 // Tinybird deletion is async, so we can't return actual count + } catch (error) { + log.error(`Failed to delete maintainers from Tinybird: ${error.message}`) + throw error + } +} + +/** + * Delete maintainers from Snowflake + */ +async function deleteMaintainersFromSnowflake( + snowflake: SnowflakeClient | null, + repoId: string, + repoUrl: string, + dryRun: boolean = false, +): Promise { + if (!snowflake) { + log.info('Skipping Snowflake maintainer deletion (Snowflake not available)') + return 0 + } + if (dryRun) { + log.info(`[DRY RUN] Querying maintainers from Snowflake for repository: ${repoUrl}`) + try { + const query = `SELECT COUNT(*) as count FROM maintainersInternal WHERE repoId = '${repoId}'` + const result = await snowflake.run<{ COUNT: string }>(query) + const count = result.length > 0 ? parseInt(result[0].COUNT, 10) : 0 + log.info(`[DRY RUN] Would delete ${count} maintainer(s) from Snowflake`) + return count + } catch (error) { + log.error(`Failed to query maintainers from Snowflake: ${error.message}`) + throw error + } + } + + log.info(`Deleting maintainers from Snowflake for repository: ${repoUrl}`) + try { + log.info('Deleting from maintainersInternal table...') + const query = `DELETE FROM maintainersInternal WHERE repoId = '${repoId}'` + await snowflake.run(query) + log.info(`✓ Deleted maintainers from Snowflake`) + return 0 // Snowflake doesn't return count + } catch (error) { + log.error(`Failed to delete maintainers from Snowflake: ${error.message}`) + throw error + } +} + +/** + * Query activity IDs from Tinybird for a fork repository + * This must be done before deletion because Tinybird deletions are asynchronous + */ +async function queryActivityIds( + tinybird: TinybirdClient, + segmentId: string, + channel: string, +): Promise { + log.info(`Querying activity IDs from Tinybird for segment: ${segmentId}, channel: ${channel}`) + + try { + const query = `SELECT activityId FROM activityRelations WHERE segmentId = '${segmentId}' AND channel = '${channel}' AND platform = 'git' FORMAT JSON` + const result = await tinybird.rawSql<{ data: Array<{ activityId: string }> }>(query) + + const activityIds = result.data.map((row) => row.activityId) + log.info(`Found ${activityIds.length} activity ID(s) in Tinybird`) + return activityIds + } catch (error) { + log.error(`Failed to query activity IDs from Tinybird: ${error.message}`) + throw error + } +} + +/** + * Delete activities from Tinybird using the delete API + * Note: Tinybird deletions are async and may take time to reflect + * See: https://www.tinybird.co/docs/classic/get-data-in/data-operations/replace-and-delete-data#delete-data-selectively + */ +async function deleteActivitiesFromTinybird( + tinybird: TinybirdClient, + activityIds: string[], + dryRun: boolean = false, +): Promise { + if (activityIds.length === 0) { + log.info('No activities to delete from Tinybird') + return + } + + if (dryRun) { + log.info(`[DRY RUN] Would delete ${activityIds.length} activities from Tinybird`) + log.info(`[DRY RUN] Would delete from 'activities' datasource: ${activityIds.length} activity(ies)`) + log.info(`[DRY RUN] Would delete from 'activityRelations' datasource: ${activityIds.length} relation(s)`) + return + } + + log.info(`Deleting ${activityIds.length} activities from Tinybird...`) + + // Format activity IDs for SQL IN clause + const idsString = activityIds.map((id) => `'${id}'`).join(',') + + try { + // Delete from activities datasource + log.info('Deleting from activities datasource...') + const activitiesDeleteCondition = `id IN (${idsString})` + const activitiesJobResponse = await tinybird.deleteDatasource('activities', activitiesDeleteCondition) + log.info( + `✓ Submitted deletion job for activities (job_id: ${activitiesJobResponse.job_id})`, + ) + + // Delete from activityRelations datasource + log.info('Deleting from activityRelations datasource...') + const activityRelationsDeleteCondition = `activityId IN (${idsString})` + const activityRelationsJobResponse = await tinybird.deleteDatasource( + 'activityRelations', + activityRelationsDeleteCondition, + ) + log.info( + `✓ Submitted deletion job for activityRelations (job_id: ${activityRelationsJobResponse.job_id})`, + ) + + log.info( + `✓ Submitted deletion jobs to Tinybird (note: deletions are async and may take time to complete)`, + ) + log.info(` Activities job URL: ${activitiesJobResponse.job_url}`) + log.info(` ActivityRelations job URL: ${activityRelationsJobResponse.job_url}`) + } catch (error) { + log.error(`Failed to delete activities from Tinybird: ${error.message}`) + throw error + } +} + +/** + * Delete activity relations from Postgres + */ +async function deleteActivityRelationsFromPostgres( + postgres: QueryExecutor, + activityIds: string[], + dryRun: boolean = false, +): Promise { + if (activityIds.length === 0) { + log.info(`No activity IDs to ${dryRun ? 'query' : 'delete'} from Postgres`) + return 0 + } + + if (dryRun) { + log.info(`[DRY RUN] Querying ${activityIds.length} activity relations from Postgres...`) + const query = ` + SELECT COUNT(*) as count + FROM "activityRelations" + WHERE "activityId" IN ($(activityIds:csv)) + ` + const result = (await postgres.selectOne(query, { activityIds })) as { count: string } + const rowCount = parseInt(result.count, 10) + log.info(`[DRY RUN] Would delete ${rowCount} activity relation(s) from Postgres`) + return rowCount + } + + log.info(`Deleting ${activityIds.length} activity relations from Postgres...`) + const query = ` + DELETE FROM "activityRelations" + WHERE "activityId" IN ($(activityIds:csv)) + ` + const rowCount = await postgres.result(query, { activityIds }) + log.info(`✓ Deleted ${rowCount} activity relation(s) from Postgres`) + return rowCount +} + +/** + * Delete activities and activity relations from Snowflake + */ +async function deleteActivitiesFromSnowflake( + snowflake: SnowflakeClient | null, + activityIds: string[], + dryRun: boolean = false, +): Promise { + if (!snowflake) { + log.info('Skipping Snowflake activity deletion (Snowflake not available)') + return + } + + if (activityIds.length === 0) { + log.info('No activity IDs to delete from Snowflake') + return + } + + if (dryRun) { + log.info(`[DRY RUN] Would delete ${activityIds.length} activities from Snowflake`) + log.info(`[DRY RUN] Would delete from 'activityRelations' table: ${activityIds.length} relation(s)`) + log.info(`[DRY RUN] Would delete from 'activities' table: ${activityIds.length} activity(ies)`) + return + } + + log.info(`Deleting ${activityIds.length} activities from Snowflake...`) + + // Format activity IDs for SQL IN clause + const idsString = activityIds.map((id) => `'${id}'`).join(',') + + try { + // Delete from activityRelations first (foreign key dependency) + log.info('Deleting from activityRelations table...') + const activityRelationsQuery = `DELETE FROM activityRelations WHERE activityId IN (${idsString})` + await snowflake.run(activityRelationsQuery) + + // Delete from activities table + log.info('Deleting from activities table...') + const activitiesQuery = `DELETE FROM activities WHERE id IN (${idsString})` + await snowflake.run(activitiesQuery) + + log.info(`✓ Deleted activities from Snowflake`) + } catch (error) { + log.error(`Failed to delete activities from Snowflake: ${error.message}`) + throw error + } +} + +/** + * Process cleanup for a single fork repository + */ +async function cleanupForkRepository( + clients: DatabaseClients, + repo: ForkRepository, + dryRun: boolean = false, +): Promise { + if (dryRun) { + log.info(`\n${'='.repeat(80)}`) + log.info(`[DRY RUN MODE] Processing: ${repo.url}`) + log.info(`${'='.repeat(80)}`) + } else { + log.info(`\n${'='.repeat(80)}`) + log.info(`Processing: ${repo.url}`) + log.info(`${'='.repeat(80)}`) + } + + try { + // Initialize Tinybird client once for this repository + const tinybird = new TinybirdClient() + + // Step 1: Delete maintainers from all systems + log.info('Deleting maintainers from all systems...') + const maintainersDeletedPostgres = await deleteMaintainersFromPostgres( + clients.postgres, + repo.id, + repo.url, + dryRun, + ) + const maintainersDeletedTinybird = await deleteMaintainersFromTinybird( + tinybird, + repo.id, + repo.url, + dryRun, + ) + const maintainersDeletedSnowflake = await deleteMaintainersFromSnowflake( + clients.snowflake, + repo.id, + repo.url, + dryRun, + ) + + // Step 2: Query activity IDs from Tinybird + const activityIds = await queryActivityIds(tinybird, repo.segmentId, repo.url) + log.info(`Found ${activityIds.length} activity ID(s) to ${dryRun ? 'query' : 'delete'}`) + + if (activityIds.length === 0) { + log.info('No activities to delete, skipping deletion steps') + log.info(`✓ Completed ${dryRun ? 'dry run for' : 'cleanup for'} ${repo.url}`) + log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Postgres): ${maintainersDeletedPostgres}`) + if (clients.snowflake) { + log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Snowflake): ${maintainersDeletedSnowflake}`) + } + log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Tinybird): ${maintainersDeletedTinybird}`) + return + } + + // Step 3: Delete from Tinybird + await deleteActivitiesFromTinybird(tinybird, activityIds, dryRun) + + // Step 4: Delete from Postgres + const activityRelationsDeleted = await deleteActivityRelationsFromPostgres( + clients.postgres, + activityIds, + dryRun, + ) + + // Step 5: Delete from Snowflake + await deleteActivitiesFromSnowflake(clients.snowflake, activityIds, dryRun) + + log.info(`✓ Completed ${dryRun ? 'dry run for' : 'cleanup for'} ${repo.url}`) + log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Postgres): ${maintainersDeletedPostgres}`) + if (clients.snowflake) { + log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Snowflake): ${maintainersDeletedSnowflake}`) + } + log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Tinybird): ${maintainersDeletedTinybird}`) + log.info(` - Activities ${dryRun ? 'found' : 'deleted'}: ${activityIds.length}`) + log.info(` - Activity relations ${dryRun ? 'found' : 'deleted'}: ${activityRelationsDeleted}`) + } catch (error) { + log.error(`Failed to cleanup repository ${repo.url}: ${error.message}`) + throw error + } +} + +/** + * Main entry point + */ +async function main() { + const args = process.argv.slice(2) + + // Parse flags + const dryRunIndex = args.indexOf('--dry-run') + const skipSnowflakeIndex = args.indexOf('--skip-snowflake') + const dryRun = dryRunIndex !== -1 + const skipSnowflake = skipSnowflakeIndex !== -1 + + // Remove flags from args to get URLs + const urls = args.filter( + (arg, index) => index !== dryRunIndex && index !== skipSnowflakeIndex + ) + + if (urls.length === 0) { + log.error(` + Usage: + # Via package.json script (recommended): + pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--skip-snowflake] + + # Or directly with tsx: + npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--skip-snowflake] + + Arguments: + repo-url: One or more repository URLs to clean up + --dry-run: (optional) Display what would be deleted without actually deleting anything + --skip-snowflake: (optional) Skip all Snowflake operations (useful for testing without valid Snowflake credentials) + + Examples: + # Clean up a single repository + pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 + + # Clean up multiple repositories + pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 https://github.com/owner/repo2 + + # Dry run to preview what would be deleted + pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 --dry-run + + # Skip Snowflake operations (for testing without Snowflake credentials) + pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 --skip-snowflake + + # Combine flags + pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 --dry-run --skip-snowflake + + Note: + - URLs must exist in git.repositories table + - Repository must be a fork (forkedFrom field must not be null) + `) + process.exit(1) + } + + if (dryRun) { + log.info(`\n${'='.repeat(80)}`) + log.info('🧪 DRY RUN MODE - No data will be deleted') + log.info(`${'='.repeat(80)}\n`) + } + + if (skipSnowflake) { + log.info(`\n${'='.repeat(80)}`) + log.info('⚠️ SNOWFLAKE DISABLED - Skipping all Snowflake operations') + log.info(`${'='.repeat(80)}\n`) + } + + try { + log.info(`Processing ${urls.length} repository URL(s)`) + + // Initialize database clients + const clients = await initDatabaseClients(skipSnowflake) + + // Process cleanup workflow + log.info(`\n${'='.repeat(80)}`) + log.info(`Starting ${dryRun ? 'dry run (preview)' : 'cleanup'} workflow`) + log.info(`${'='.repeat(80)}`) + + let successCount = 0 + let failureCount = 0 + const failedRepos: string[] = [] + + for (const url of urls) { + try { + // Lookup and cleanup in the same loop + const repo = await lookupForkRepository(clients.postgres, url) + await cleanupForkRepository(clients, repo, dryRun) + successCount++ + } catch (error) { + failureCount++ + failedRepos.push(url) + log.error(`Failed to process ${url}: ${error.message}`) + // Continue with next repository instead of stopping + } + } + + // Summary + log.info(`\n${'='.repeat(80)}`) + log.info('Cleanup Summary') + log.info(`${'='.repeat(80)}`) + log.info(`✓ Successfully processed: ${successCount}/${urls.length}`) + if (failureCount > 0) { + log.warn(`✗ Failed: ${failureCount}/${urls.length}`) + log.warn('Failed repositories:') + failedRepos.forEach((url) => log.warn(` - ${url}`)) + } + + process.exit(failureCount > 0 ? 1 : 0) + } catch (error) { + log.error(error, 'Failed to run cleanup script') + log.error(`\n❌ Error: ${error.message}`) + process.exit(1) + } +} + +main().catch((error) => { + log.error('Unexpected error:', error) + process.exit(1) +}) From adb9656dd292f7306032b2ae2dbc00655ffc21bf Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 31 Oct 2025 15:14:18 +0100 Subject: [PATCH 03/14] fix: case sensitive tablename --- .../src/bin/cleanup-fork-activities-and-maintainers.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts index addc591451..0d15183ec3 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -174,7 +174,7 @@ async function deleteMaintainersFromPostgres( log.info(`[DRY RUN] Querying maintainers for repository: ${repoUrl}`) const query = ` SELECT COUNT(*) as count - FROM maintainersInternal + FROM "maintainersInternal" WHERE "repoId" = $(repoId) ` const result = (await postgres.selectOne(query, { repoId })) as { count: string } @@ -185,7 +185,7 @@ async function deleteMaintainersFromPostgres( log.info(`Deleting maintainers for repository: ${repoUrl}`) const query = ` - DELETE FROM maintainersInternal + DELETE FROM "maintainersInternal" WHERE "repoId" = $(repoId) ` const rowCount = await postgres.result(query, { repoId }) @@ -255,7 +255,7 @@ async function deleteMaintainersFromSnowflake( if (dryRun) { log.info(`[DRY RUN] Querying maintainers from Snowflake for repository: ${repoUrl}`) try { - const query = `SELECT COUNT(*) as count FROM maintainersInternal WHERE repoId = '${repoId}'` + const query = `SELECT COUNT(*) as count FROM "maintainersInternal" WHERE "repoId" = '${repoId}'` const result = await snowflake.run<{ COUNT: string }>(query) const count = result.length > 0 ? parseInt(result[0].COUNT, 10) : 0 log.info(`[DRY RUN] Would delete ${count} maintainer(s) from Snowflake`) @@ -269,7 +269,7 @@ async function deleteMaintainersFromSnowflake( log.info(`Deleting maintainers from Snowflake for repository: ${repoUrl}`) try { log.info('Deleting from maintainersInternal table...') - const query = `DELETE FROM maintainersInternal WHERE repoId = '${repoId}'` + const query = `DELETE FROM "maintainersInternal" WHERE "repoId" = '${repoId}'` await snowflake.run(query) log.info(`✓ Deleted maintainers from Snowflake`) return 0 // Snowflake doesn't return count From 291f709ed7043a5683491f60006f70bc77034372 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 31 Oct 2025 15:46:32 +0100 Subject: [PATCH 04/14] feat: implement batching --- ...cleanup-fork-activities-and-maintainers.ts | 78 ++++++++++++++----- 1 file changed, 59 insertions(+), 19 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts index 0d15183ec3..97ead3eb67 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -13,14 +13,15 @@ * * Usage: * # Via package.json script (recommended): - * pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--skip-snowflake] + * pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--skip-snowflake] [--tb-token ] * * # Or directly with tsx: - * npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--skip-snowflake] + * npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--skip-snowflake] [--tb-token ] * * Options: * --dry-run Display what would be deleted without actually deleting anything * --skip-snowflake Skip all Snowflake operations (useful for testing without valid Snowflake credentials) + * --tb-token Tinybird API token to use (overrides CROWD_TINYBIRD_ACTIVITIES_TOKEN environment variable) * * Environment Variables Required: * CROWD_DB_WRITE_HOST - Postgres write host @@ -452,6 +453,7 @@ async function cleanupForkRepository( clients: DatabaseClients, repo: ForkRepository, dryRun: boolean = false, + tbToken?: string, ): Promise { if (dryRun) { log.info(`\n${'='.repeat(80)}`) @@ -465,7 +467,7 @@ async function cleanupForkRepository( try { // Initialize Tinybird client once for this repository - const tinybird = new TinybirdClient() + const tinybird = new TinybirdClient(tbToken) // Step 1: Delete maintainers from all systems log.info('Deleting maintainers from all systems...') @@ -503,18 +505,37 @@ async function cleanupForkRepository( return } - // Step 3: Delete from Tinybird - await deleteActivitiesFromTinybird(tinybird, activityIds, dryRun) + // Process activities in batches of 500 + const BATCH_SIZE = 500 + const batches: string[][] = [] + for (let i = 0; i < activityIds.length; i += BATCH_SIZE) { + batches.push(activityIds.slice(i, i + BATCH_SIZE)) + } - // Step 4: Delete from Postgres - const activityRelationsDeleted = await deleteActivityRelationsFromPostgres( - clients.postgres, - activityIds, - dryRun, - ) + log.info(`Processing ${activityIds.length} activities in ${batches.length} batch(es) of up to ${BATCH_SIZE}`) + + let totalActivityRelationsDeleted = 0 + + for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) { + const batch = batches[batchIndex] + log.info(`Processing batch ${batchIndex + 1}/${batches.length} (${batch.length} activities)...`) + + // Step 3: Delete from Tinybird + await deleteActivitiesFromTinybird(tinybird, batch, dryRun) - // Step 5: Delete from Snowflake - await deleteActivitiesFromSnowflake(clients.snowflake, activityIds, dryRun) + // Step 4: Delete from Postgres + const activityRelationsDeleted = await deleteActivityRelationsFromPostgres( + clients.postgres, + batch, + dryRun, + ) + totalActivityRelationsDeleted += activityRelationsDeleted + + // Step 5: Delete from Snowflake + await deleteActivitiesFromSnowflake(clients.snowflake, batch, dryRun) + + log.info(`✓ Completed batch ${batchIndex + 1}/${batches.length}`) + } log.info(`✓ Completed ${dryRun ? 'dry run for' : 'cleanup for'} ${repo.url}`) log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Postgres): ${maintainersDeletedPostgres}`) @@ -523,7 +544,7 @@ async function cleanupForkRepository( } log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Tinybird): ${maintainersDeletedTinybird}`) log.info(` - Activities ${dryRun ? 'found' : 'deleted'}: ${activityIds.length}`) - log.info(` - Activity relations ${dryRun ? 'found' : 'deleted'}: ${activityRelationsDeleted}`) + log.info(` - Activity relations ${dryRun ? 'found' : 'deleted'}: ${totalActivityRelationsDeleted}`) } catch (error) { log.error(`Failed to cleanup repository ${repo.url}: ${error.message}`) throw error @@ -539,27 +560,43 @@ async function main() { // Parse flags const dryRunIndex = args.indexOf('--dry-run') const skipSnowflakeIndex = args.indexOf('--skip-snowflake') + const tbTokenIndex = args.indexOf('--tb-token') const dryRun = dryRunIndex !== -1 const skipSnowflake = skipSnowflakeIndex !== -1 - // Remove flags from args to get URLs + // Extract tb-token value if provided + let tbToken: string | undefined + if (tbTokenIndex !== -1) { + if (tbTokenIndex + 1 >= args.length) { + log.error('Error: --tb-token requires a value') + process.exit(1) + } + tbToken = args[tbTokenIndex + 1] + } + + // Remove flags and their values from args to get URLs const urls = args.filter( - (arg, index) => index !== dryRunIndex && index !== skipSnowflakeIndex + (arg, index) => + index !== dryRunIndex && + index !== skipSnowflakeIndex && + index !== tbTokenIndex && + (tbTokenIndex === -1 || index !== tbTokenIndex + 1) ) if (urls.length === 0) { log.error(` Usage: # Via package.json script (recommended): - pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--skip-snowflake] + pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--skip-snowflake] [--tb-token ] # Or directly with tsx: - npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--skip-snowflake] + npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--skip-snowflake] [--tb-token ] Arguments: repo-url: One or more repository URLs to clean up --dry-run: (optional) Display what would be deleted without actually deleting anything --skip-snowflake: (optional) Skip all Snowflake operations (useful for testing without valid Snowflake credentials) + --tb-token: (optional) Tinybird API token to use (overrides CROWD_TINYBIRD_ACTIVITIES_TOKEN environment variable) Examples: # Clean up a single repository @@ -576,6 +613,9 @@ async function main() { # Combine flags pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 --dry-run --skip-snowflake + + # Use custom Tinybird token + pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 --tb-token your-token-here Note: - URLs must exist in git.repositories table @@ -615,7 +655,7 @@ async function main() { try { // Lookup and cleanup in the same loop const repo = await lookupForkRepository(clients.postgres, url) - await cleanupForkRepository(clients, repo, dryRun) + await cleanupForkRepository(clients, repo, dryRun, tbToken) successCount++ } catch (error) { failureCount++ From 9912591fcd13c8a52b0b31a90eae243a4f287eb1 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 31 Oct 2025 15:46:48 +0100 Subject: [PATCH 05/14] feat: support custom token --- services/libs/database/src/tinybirdClient.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/libs/database/src/tinybirdClient.ts b/services/libs/database/src/tinybirdClient.ts index aa64562f50..e7c4dfb73d 100644 --- a/services/libs/database/src/tinybirdClient.ts +++ b/services/libs/database/src/tinybirdClient.ts @@ -42,9 +42,9 @@ export class TinybirdClient { keepAlive: false, // Disable keep-alive to avoid stale socket reuse }) - constructor() { + constructor(token?: string) { this.host = process.env.CROWD_TINYBIRD_BASE_URL ?? 'https://api.tinybird.co' - this.token = process.env.CROWD_TINYBIRD_ACTIVITIES_TOKEN ?? '' + this.token = token ?? process.env.CROWD_TINYBIRD_ACTIVITIES_TOKEN ?? '' } /** From 9fc52b0ee234156d6cc972604c36eeb942050d43 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 31 Oct 2025 17:42:06 +0100 Subject: [PATCH 06/14] fix: lint --- ...cleanup-fork-activities-and-maintainers.ts | 74 +++++++++++++------ 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts index 97ead3eb67..70b9bb68f9 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -38,7 +38,11 @@ * CROWD_SNOWFLAKE_WAREHOUSE - Snowflake warehouse * CROWD_SNOWFLAKE_ROLE - Snowflake role */ -import { TinybirdClient, WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' +import { + TinybirdClient, + WRITE_DB_CONFIG, + getDbConnection, +} from '@crowd/data-access-layer/src/database' import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getServiceChildLogger } from '@crowd/logging' import { SnowflakeClient } from '@crowd/snowflake' @@ -123,10 +127,7 @@ async function initDatabaseClients(skipSnowflake: boolean = false): Promise { +async function lookupForkRepository(postgres: QueryExecutor, url: string): Promise { log.info(`Looking up repository by URL: ${url}`) const query = ` @@ -321,8 +322,12 @@ async function deleteActivitiesFromTinybird( if (dryRun) { log.info(`[DRY RUN] Would delete ${activityIds.length} activities from Tinybird`) - log.info(`[DRY RUN] Would delete from 'activities' datasource: ${activityIds.length} activity(ies)`) - log.info(`[DRY RUN] Would delete from 'activityRelations' datasource: ${activityIds.length} relation(s)`) + log.info( + `[DRY RUN] Would delete from 'activities' datasource: ${activityIds.length} activity(ies)`, + ) + log.info( + `[DRY RUN] Would delete from 'activityRelations' datasource: ${activityIds.length} relation(s)`, + ) return } @@ -335,10 +340,11 @@ async function deleteActivitiesFromTinybird( // Delete from activities datasource log.info('Deleting from activities datasource...') const activitiesDeleteCondition = `id IN (${idsString})` - const activitiesJobResponse = await tinybird.deleteDatasource('activities', activitiesDeleteCondition) - log.info( - `✓ Submitted deletion job for activities (job_id: ${activitiesJobResponse.job_id})`, + const activitiesJobResponse = await tinybird.deleteDatasource( + 'activities', + activitiesDeleteCondition, ) + log.info(`✓ Submitted deletion job for activities (job_id: ${activitiesJobResponse.job_id})`) // Delete from activityRelations datasource log.info('Deleting from activityRelations datasource...') @@ -418,7 +424,9 @@ async function deleteActivitiesFromSnowflake( if (dryRun) { log.info(`[DRY RUN] Would delete ${activityIds.length} activities from Snowflake`) - log.info(`[DRY RUN] Would delete from 'activityRelations' table: ${activityIds.length} relation(s)`) + log.info( + `[DRY RUN] Would delete from 'activityRelations' table: ${activityIds.length} relation(s)`, + ) log.info(`[DRY RUN] Would delete from 'activities' table: ${activityIds.length} activity(ies)`) return } @@ -497,11 +505,17 @@ async function cleanupForkRepository( if (activityIds.length === 0) { log.info('No activities to delete, skipping deletion steps') log.info(`✓ Completed ${dryRun ? 'dry run for' : 'cleanup for'} ${repo.url}`) - log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Postgres): ${maintainersDeletedPostgres}`) + log.info( + ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Postgres): ${maintainersDeletedPostgres}`, + ) if (clients.snowflake) { - log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Snowflake): ${maintainersDeletedSnowflake}`) + log.info( + ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Snowflake): ${maintainersDeletedSnowflake}`, + ) } - log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Tinybird): ${maintainersDeletedTinybird}`) + log.info( + ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Tinybird): ${maintainersDeletedTinybird}`, + ) return } @@ -512,13 +526,17 @@ async function cleanupForkRepository( batches.push(activityIds.slice(i, i + BATCH_SIZE)) } - log.info(`Processing ${activityIds.length} activities in ${batches.length} batch(es) of up to ${BATCH_SIZE}`) + log.info( + `Processing ${activityIds.length} activities in ${batches.length} batch(es) of up to ${BATCH_SIZE}`, + ) let totalActivityRelationsDeleted = 0 for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) { const batch = batches[batchIndex] - log.info(`Processing batch ${batchIndex + 1}/${batches.length} (${batch.length} activities)...`) + log.info( + `Processing batch ${batchIndex + 1}/${batches.length} (${batch.length} activities)...`, + ) // Step 3: Delete from Tinybird await deleteActivitiesFromTinybird(tinybird, batch, dryRun) @@ -538,13 +556,21 @@ async function cleanupForkRepository( } log.info(`✓ Completed ${dryRun ? 'dry run for' : 'cleanup for'} ${repo.url}`) - log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Postgres): ${maintainersDeletedPostgres}`) + log.info( + ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Postgres): ${maintainersDeletedPostgres}`, + ) if (clients.snowflake) { - log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Snowflake): ${maintainersDeletedSnowflake}`) + log.info( + ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Snowflake): ${maintainersDeletedSnowflake}`, + ) } - log.info(` - Maintainers ${dryRun ? 'found' : 'deleted'} (Tinybird): ${maintainersDeletedTinybird}`) + log.info( + ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Tinybird): ${maintainersDeletedTinybird}`, + ) log.info(` - Activities ${dryRun ? 'found' : 'deleted'}: ${activityIds.length}`) - log.info(` - Activity relations ${dryRun ? 'found' : 'deleted'}: ${totalActivityRelationsDeleted}`) + log.info( + ` - Activity relations ${dryRun ? 'found' : 'deleted'}: ${totalActivityRelationsDeleted}`, + ) } catch (error) { log.error(`Failed to cleanup repository ${repo.url}: ${error.message}`) throw error @@ -556,14 +582,14 @@ async function cleanupForkRepository( */ async function main() { const args = process.argv.slice(2) - + // Parse flags const dryRunIndex = args.indexOf('--dry-run') const skipSnowflakeIndex = args.indexOf('--skip-snowflake') const tbTokenIndex = args.indexOf('--tb-token') const dryRun = dryRunIndex !== -1 const skipSnowflake = skipSnowflakeIndex !== -1 - + // Extract tb-token value if provided let tbToken: string | undefined if (tbTokenIndex !== -1) { @@ -573,14 +599,14 @@ async function main() { } tbToken = args[tbTokenIndex + 1] } - + // Remove flags and their values from args to get URLs const urls = args.filter( (arg, index) => index !== dryRunIndex && index !== skipSnowflakeIndex && index !== tbTokenIndex && - (tbTokenIndex === -1 || index !== tbTokenIndex + 1) + (tbTokenIndex === -1 || index !== tbTokenIndex + 1), ) if (urls.length === 0) { From 99400c07d43631b69fc22e73e13659b88700cdc2 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Fri, 31 Oct 2025 17:47:48 +0100 Subject: [PATCH 07/14] fix: lint --- services/libs/database/src/tinybirdClient.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/libs/database/src/tinybirdClient.ts b/services/libs/database/src/tinybirdClient.ts index e7c4dfb73d..773b887387 100644 --- a/services/libs/database/src/tinybirdClient.ts +++ b/services/libs/database/src/tinybirdClient.ts @@ -145,7 +145,7 @@ export class TinybirdClient { */ async rawSql(query: string): Promise { const url = `${this.host}/v0/sql` - + const result = await axios.post( url, { q: query }, @@ -162,7 +162,7 @@ export class TinybirdClient { /** * Delete data from a Tinybird datasource using the delete API * See: https://www.tinybird.co/docs/classic/get-data-in/data-operations/replace-and-delete-data#delete-data-selectively - * + * * @param datasourceName - Name of the datasource to delete from * @param deleteCondition - SQL expression filter (e.g., "repoId = 'xxx'", "id IN ('a', 'b')") * @returns Job response with job_id and job_url for tracking deletion progress @@ -201,4 +201,4 @@ export class TinybirdClient { return result.data } -} \ No newline at end of file +} From aca27057d4e3cabb72d2daa6550bbb216402c43a Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Mon, 3 Nov 2025 11:23:19 +0100 Subject: [PATCH 08/14] fix: lint --- .../cleanup-fork-activities-and-maintainers.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts index 70b9bb68f9..d523b9548e 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -98,7 +98,7 @@ function initSnowflakeClient(): SnowflakeClient { /** * Initialize all database clients */ -async function initDatabaseClients(skipSnowflake: boolean = false): Promise { +async function initDatabaseClients(skipSnowflake = false): Promise { log.info('Initializing database clients...') const postgres = await initPostgresClient() @@ -170,7 +170,7 @@ async function deleteMaintainersFromPostgres( postgres: QueryExecutor, repoId: string, repoUrl: string, - dryRun: boolean = false, + dryRun = false, ): Promise { if (dryRun) { log.info(`[DRY RUN] Querying maintainers for repository: ${repoUrl}`) @@ -204,7 +204,7 @@ async function deleteMaintainersFromTinybird( tinybird: TinybirdClient, repoId: string, repoUrl: string, - dryRun: boolean = false, + dryRun = false, ): Promise { if (dryRun) { log.info(`[DRY RUN] Querying maintainers from Tinybird for repository: ${repoUrl}`) @@ -248,7 +248,7 @@ async function deleteMaintainersFromSnowflake( snowflake: SnowflakeClient | null, repoId: string, repoUrl: string, - dryRun: boolean = false, + dryRun = false, ): Promise { if (!snowflake) { log.info('Skipping Snowflake maintainer deletion (Snowflake not available)') @@ -313,7 +313,7 @@ async function queryActivityIds( async function deleteActivitiesFromTinybird( tinybird: TinybirdClient, activityIds: string[], - dryRun: boolean = false, + dryRun = false, ): Promise { if (activityIds.length === 0) { log.info('No activities to delete from Tinybird') @@ -374,7 +374,7 @@ async function deleteActivitiesFromTinybird( async function deleteActivityRelationsFromPostgres( postgres: QueryExecutor, activityIds: string[], - dryRun: boolean = false, + dryRun = false, ): Promise { if (activityIds.length === 0) { log.info(`No activity IDs to ${dryRun ? 'query' : 'delete'} from Postgres`) @@ -410,7 +410,7 @@ async function deleteActivityRelationsFromPostgres( async function deleteActivitiesFromSnowflake( snowflake: SnowflakeClient | null, activityIds: string[], - dryRun: boolean = false, + dryRun = false, ): Promise { if (!snowflake) { log.info('Skipping Snowflake activity deletion (Snowflake not available)') @@ -460,7 +460,7 @@ async function deleteActivitiesFromSnowflake( async function cleanupForkRepository( clients: DatabaseClients, repo: ForkRepository, - dryRun: boolean = false, + dryRun = false, tbToken?: string, ): Promise { if (dryRun) { From 9f060aba5b2edba500858f498cd2e0bf5a8a7e0f Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 4 Nov 2025 11:52:59 +0100 Subject: [PATCH 09/14] fix: change deletions order to allow retries in case of failure --- .../src/bin/cleanup-fork-activities-and-maintainers.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts index d523b9548e..625eba17c4 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -538,10 +538,7 @@ async function cleanupForkRepository( `Processing batch ${batchIndex + 1}/${batches.length} (${batch.length} activities)...`, ) - // Step 3: Delete from Tinybird - await deleteActivitiesFromTinybird(tinybird, batch, dryRun) - - // Step 4: Delete from Postgres + // Step 3: Delete from Postgres const activityRelationsDeleted = await deleteActivityRelationsFromPostgres( clients.postgres, batch, @@ -549,9 +546,12 @@ async function cleanupForkRepository( ) totalActivityRelationsDeleted += activityRelationsDeleted - // Step 5: Delete from Snowflake + // Step 4: Delete from Snowflake await deleteActivitiesFromSnowflake(clients.snowflake, batch, dryRun) + // Step 5: Delete from Tinybird last (source of truth - delete last so we can retry if needed) + await deleteActivitiesFromTinybird(tinybird, batch, dryRun) + log.info(`✓ Completed batch ${batchIndex + 1}/${batches.length}`) } From 45b7aaf73f07ab9bd965044a581a59addad25289 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 4 Nov 2025 11:53:46 +0100 Subject: [PATCH 10/14] chore: set batch size to 200 instead of 500 --- .../src/bin/cleanup-fork-activities-and-maintainers.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts index 625eba17c4..a547e94873 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -519,8 +519,8 @@ async function cleanupForkRepository( return } - // Process activities in batches of 500 - const BATCH_SIZE = 500 + // Process activities in batches of 200 + const BATCH_SIZE = 200 const batches: string[][] = [] for (let i = 0; i < activityIds.length; i += BATCH_SIZE) { batches.push(activityIds.slice(i, i + BATCH_SIZE)) From 306cfe0b7f5234015d3cd2061f6338cefa247162 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 4 Nov 2025 12:07:10 +0100 Subject: [PATCH 11/14] chore: refactor pipeSql to use executeSql --- ...cleanup-fork-activities-and-maintainers.ts | 4 +- services/libs/database/src/tinybirdClient.ts | 38 +++++++------------ 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts index a547e94873..9cef3cc7b4 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -210,7 +210,7 @@ async function deleteMaintainersFromTinybird( log.info(`[DRY RUN] Querying maintainers from Tinybird for repository: ${repoUrl}`) try { const query = `SELECT COUNT(*) as count FROM maintainersInternal WHERE repoId = '${repoId}' FORMAT JSON` - const result = await tinybird.rawSql<{ data: Array<{ count: string }> }>(query) + const result = await tinybird.executeSql<{ data: Array<{ count: string }> }>(query) const count = result.data.length > 0 ? parseInt(result.data[0].count, 10) : 0 log.info(`[DRY RUN] Would delete ${count} maintainer(s) from Tinybird`) return count @@ -294,7 +294,7 @@ async function queryActivityIds( try { const query = `SELECT activityId FROM activityRelations WHERE segmentId = '${segmentId}' AND channel = '${channel}' AND platform = 'git' FORMAT JSON` - const result = await tinybird.rawSql<{ data: Array<{ activityId: string }> }>(query) + const result = await tinybird.executeSql<{ data: Array<{ activityId: string }> }>(query) const activityIds = result.data.map((row) => row.activityId) log.info(`Found ${activityIds.length} activity ID(s) in Tinybird`) diff --git a/services/libs/database/src/tinybirdClient.ts b/services/libs/database/src/tinybirdClient.ts index 773b887387..04e070fe9e 100644 --- a/services/libs/database/src/tinybirdClient.ts +++ b/services/libs/database/src/tinybirdClient.ts @@ -107,21 +107,18 @@ export class TinybirdClient { } } - // Compose the final request body - const url = `${this.host}/v0/sql` - const body: Record = { - q: `% SELECT * FROM ${pipeName} FORMAT JSON`, - format: 'json', - } + const query = `% SELECT * FROM ${pipeName} FORMAT JSON` // Copy user params as-is, preserving arrays and primitives + const bodyParams: Record = { format: 'json' } for (const [k, v] of Object.entries(params)) { if (v === undefined || v === null) continue + // Sanity: Tinybird accepts arrays, booleans, numbers, strings if (Array.isArray(v)) { - body[k] = v.map((x) => String(x)) + bodyParams[k] = v.map((x) => String(x)) } else if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { - body[k] = v + bodyParams[k] = v } else { // If you accidentally pass a Date here, throw to avoid silent mismatch throw new Error( @@ -130,31 +127,22 @@ export class TinybirdClient { } } - const result = await axios.post(url, body, { - headers: this.getHeaders('application/json'), - responseType: 'json', - httpsAgent: TinybirdClient.httpsAgent, - }) - - return result.data + return await this.executeSql(query, bodyParams) } /** * Execute raw SQL query on Tinybird * Useful for queries that don't go through named pipes (e.g., ALTER TABLE, direct SELECT) */ - async rawSql(query: string): Promise { + async executeSql(query: string, bodyParams?: Record): Promise { const url = `${this.host}/v0/sql` + const body: Record = { q: query, ...bodyParams } - const result = await axios.post( - url, - { q: query }, - { - headers: this.getHeaders('application/json'), - responseType: 'json', - httpsAgent: TinybirdClient.httpsAgent, - }, - ) + const result = await axios.post(url, body, { + headers: this.getHeaders('application/json'), + responseType: 'json', + httpsAgent: TinybirdClient.httpsAgent, + }) return result.data } From 58502b0b43fc2edbe7dd2b4fee39af7aeda86401 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 18 Nov 2025 11:50:53 +0100 Subject: [PATCH 12/14] chore: remove snowflake --- ...cleanup-fork-activities-and-maintainers.ts | 183 +----------------- 1 file changed, 7 insertions(+), 176 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts index 9cef3cc7b4..fffea4bd7c 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -8,19 +8,17 @@ * SOLUTION: * This script deletes all activities and maintainers from forked repositories across: * - Tinybird - * - Snowflake * - CDP Postgres * * Usage: * # Via package.json script (recommended): - * pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--skip-snowflake] [--tb-token ] + * pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--tb-token ] * * # Or directly with tsx: - * npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--skip-snowflake] [--tb-token ] + * npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--tb-token ] * * Options: * --dry-run Display what would be deleted without actually deleting anything - * --skip-snowflake Skip all Snowflake operations (useful for testing without valid Snowflake credentials) * --tb-token Tinybird API token to use (overrides CROWD_TINYBIRD_ACTIVITIES_TOKEN environment variable) * * Environment Variables Required: @@ -31,12 +29,6 @@ * CROWD_DB_DATABASE - Postgres database name * CROWD_TINYBIRD_BASE_URL - Tinybird API base URL * CROWD_TINYBIRD_ACTIVITIES_TOKEN - Tinybird API token - * CROWD_SNOWFLAKE_PRIVATE_KEY - Snowflake private key - * CROWD_SNOWFLAKE_ACCOUNT - Snowflake account - * CROWD_SNOWFLAKE_USERNAME - Snowflake username - * CROWD_SNOWFLAKE_DATABASE - Snowflake database - * CROWD_SNOWFLAKE_WAREHOUSE - Snowflake warehouse - * CROWD_SNOWFLAKE_ROLE - Snowflake role */ import { TinybirdClient, @@ -45,7 +37,6 @@ import { } from '@crowd/data-access-layer/src/database' import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getServiceChildLogger } from '@crowd/logging' -import { SnowflakeClient } from '@crowd/snowflake' const log = getServiceChildLogger('cleanup-fork-activities-script') @@ -59,7 +50,6 @@ interface ForkRepository { interface DatabaseClients { postgres: QueryExecutor - snowflake: SnowflakeClient | null } /** @@ -75,52 +65,18 @@ async function initPostgresClient(): Promise { return queryExecutor } -/** - * Initialize Snowflake client - */ -function initSnowflakeClient(): SnowflakeClient { - log.info('Initializing Snowflake client...') - - const client = new SnowflakeClient({ - privateKeyString: process.env['CROWD_SNOWFLAKE_PRIVATE_KEY'], - account: process.env['CROWD_SNOWFLAKE_ACCOUNT'], - username: process.env['CROWD_SNOWFLAKE_USERNAME'], - database: process.env['CROWD_SNOWFLAKE_DATABASE'], - warehouse: process.env['CROWD_SNOWFLAKE_WAREHOUSE'], - role: process.env['CROWD_SNOWFLAKE_ROLE'], - parentLog: log, - }) - - log.info('Snowflake client initialized') - return client -} - /** * Initialize all database clients */ -async function initDatabaseClients(skipSnowflake = false): Promise { +async function initDatabaseClients(): Promise { log.info('Initializing database clients...') const postgres = await initPostgresClient() - let snowflake: SnowflakeClient | null = null - - if (skipSnowflake) { - log.info('Skipping Snowflake client initialization (--skip-snowflake flag set)') - } else { - try { - snowflake = initSnowflakeClient() - } catch (error) { - log.warn(`Failed to initialize Snowflake client: ${error.message}`) - log.warn('Continuing without Snowflake support') - snowflake = null - } - } log.info('All database clients initialized successfully') return { postgres, - snowflake, } } @@ -241,46 +197,6 @@ async function deleteMaintainersFromTinybird( } } -/** - * Delete maintainers from Snowflake - */ -async function deleteMaintainersFromSnowflake( - snowflake: SnowflakeClient | null, - repoId: string, - repoUrl: string, - dryRun = false, -): Promise { - if (!snowflake) { - log.info('Skipping Snowflake maintainer deletion (Snowflake not available)') - return 0 - } - if (dryRun) { - log.info(`[DRY RUN] Querying maintainers from Snowflake for repository: ${repoUrl}`) - try { - const query = `SELECT COUNT(*) as count FROM "maintainersInternal" WHERE "repoId" = '${repoId}'` - const result = await snowflake.run<{ COUNT: string }>(query) - const count = result.length > 0 ? parseInt(result[0].COUNT, 10) : 0 - log.info(`[DRY RUN] Would delete ${count} maintainer(s) from Snowflake`) - return count - } catch (error) { - log.error(`Failed to query maintainers from Snowflake: ${error.message}`) - throw error - } - } - - log.info(`Deleting maintainers from Snowflake for repository: ${repoUrl}`) - try { - log.info('Deleting from maintainersInternal table...') - const query = `DELETE FROM "maintainersInternal" WHERE "repoId" = '${repoId}'` - await snowflake.run(query) - log.info(`✓ Deleted maintainers from Snowflake`) - return 0 // Snowflake doesn't return count - } catch (error) { - log.error(`Failed to delete maintainers from Snowflake: ${error.message}`) - throw error - } -} - /** * Query activity IDs from Tinybird for a fork repository * This must be done before deletion because Tinybird deletions are asynchronous @@ -404,56 +320,6 @@ async function deleteActivityRelationsFromPostgres( return rowCount } -/** - * Delete activities and activity relations from Snowflake - */ -async function deleteActivitiesFromSnowflake( - snowflake: SnowflakeClient | null, - activityIds: string[], - dryRun = false, -): Promise { - if (!snowflake) { - log.info('Skipping Snowflake activity deletion (Snowflake not available)') - return - } - - if (activityIds.length === 0) { - log.info('No activity IDs to delete from Snowflake') - return - } - - if (dryRun) { - log.info(`[DRY RUN] Would delete ${activityIds.length} activities from Snowflake`) - log.info( - `[DRY RUN] Would delete from 'activityRelations' table: ${activityIds.length} relation(s)`, - ) - log.info(`[DRY RUN] Would delete from 'activities' table: ${activityIds.length} activity(ies)`) - return - } - - log.info(`Deleting ${activityIds.length} activities from Snowflake...`) - - // Format activity IDs for SQL IN clause - const idsString = activityIds.map((id) => `'${id}'`).join(',') - - try { - // Delete from activityRelations first (foreign key dependency) - log.info('Deleting from activityRelations table...') - const activityRelationsQuery = `DELETE FROM activityRelations WHERE activityId IN (${idsString})` - await snowflake.run(activityRelationsQuery) - - // Delete from activities table - log.info('Deleting from activities table...') - const activitiesQuery = `DELETE FROM activities WHERE id IN (${idsString})` - await snowflake.run(activitiesQuery) - - log.info(`✓ Deleted activities from Snowflake`) - } catch (error) { - log.error(`Failed to delete activities from Snowflake: ${error.message}`) - throw error - } -} - /** * Process cleanup for a single fork repository */ @@ -491,12 +357,6 @@ async function cleanupForkRepository( repo.url, dryRun, ) - const maintainersDeletedSnowflake = await deleteMaintainersFromSnowflake( - clients.snowflake, - repo.id, - repo.url, - dryRun, - ) // Step 2: Query activity IDs from Tinybird const activityIds = await queryActivityIds(tinybird, repo.segmentId, repo.url) @@ -508,11 +368,6 @@ async function cleanupForkRepository( log.info( ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Postgres): ${maintainersDeletedPostgres}`, ) - if (clients.snowflake) { - log.info( - ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Snowflake): ${maintainersDeletedSnowflake}`, - ) - } log.info( ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Tinybird): ${maintainersDeletedTinybird}`, ) @@ -546,10 +401,7 @@ async function cleanupForkRepository( ) totalActivityRelationsDeleted += activityRelationsDeleted - // Step 4: Delete from Snowflake - await deleteActivitiesFromSnowflake(clients.snowflake, batch, dryRun) - - // Step 5: Delete from Tinybird last (source of truth - delete last so we can retry if needed) + // Step 4: Delete from Tinybird last (source of truth - delete last so we can retry if needed) await deleteActivitiesFromTinybird(tinybird, batch, dryRun) log.info(`✓ Completed batch ${batchIndex + 1}/${batches.length}`) @@ -559,11 +411,6 @@ async function cleanupForkRepository( log.info( ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Postgres): ${maintainersDeletedPostgres}`, ) - if (clients.snowflake) { - log.info( - ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Snowflake): ${maintainersDeletedSnowflake}`, - ) - } log.info( ` - Maintainers ${dryRun ? 'found' : 'deleted'} (Tinybird): ${maintainersDeletedTinybird}`, ) @@ -585,10 +432,8 @@ async function main() { // Parse flags const dryRunIndex = args.indexOf('--dry-run') - const skipSnowflakeIndex = args.indexOf('--skip-snowflake') const tbTokenIndex = args.indexOf('--tb-token') const dryRun = dryRunIndex !== -1 - const skipSnowflake = skipSnowflakeIndex !== -1 // Extract tb-token value if provided let tbToken: string | undefined @@ -604,7 +449,6 @@ async function main() { const urls = args.filter( (arg, index) => index !== dryRunIndex && - index !== skipSnowflakeIndex && index !== tbTokenIndex && (tbTokenIndex === -1 || index !== tbTokenIndex + 1), ) @@ -613,15 +457,14 @@ async function main() { log.error(` Usage: # Via package.json script (recommended): - pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--skip-snowflake] [--tb-token ] + pnpm run cleanup-fork-activities -- [ ...] [--dry-run] [--tb-token ] # Or directly with tsx: - npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--skip-snowflake] [--tb-token ] + npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts [ ...] [--dry-run] [--tb-token ] Arguments: repo-url: One or more repository URLs to clean up --dry-run: (optional) Display what would be deleted without actually deleting anything - --skip-snowflake: (optional) Skip all Snowflake operations (useful for testing without valid Snowflake credentials) --tb-token: (optional) Tinybird API token to use (overrides CROWD_TINYBIRD_ACTIVITIES_TOKEN environment variable) Examples: @@ -634,12 +477,6 @@ async function main() { # Dry run to preview what would be deleted pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 --dry-run - # Skip Snowflake operations (for testing without Snowflake credentials) - pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 --skip-snowflake - - # Combine flags - pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 --dry-run --skip-snowflake - # Use custom Tinybird token pnpm run cleanup-fork-activities -- https://github.com/owner/repo1 --tb-token your-token-here @@ -656,17 +493,11 @@ async function main() { log.info(`${'='.repeat(80)}\n`) } - if (skipSnowflake) { - log.info(`\n${'='.repeat(80)}`) - log.info('⚠️ SNOWFLAKE DISABLED - Skipping all Snowflake operations') - log.info(`${'='.repeat(80)}\n`) - } - try { log.info(`Processing ${urls.length} repository URL(s)`) // Initialize database clients - const clients = await initDatabaseClients(skipSnowflake) + const clients = await initDatabaseClients() // Process cleanup workflow log.info(`\n${'='.repeat(80)}`) From 8729ead9d6f5045c816ffb25fb7e6f35cdbc5843 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 18 Nov 2025 13:10:42 +0100 Subject: [PATCH 13/14] fix: tinybird delete endpoint payload --- services/libs/database/src/tinybirdClient.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/services/libs/database/src/tinybirdClient.ts b/services/libs/database/src/tinybirdClient.ts index 04e070fe9e..76a08f6601 100644 --- a/services/libs/database/src/tinybirdClient.ts +++ b/services/libs/database/src/tinybirdClient.ts @@ -177,11 +177,14 @@ export class TinybirdClient { }> { const url = `${this.host}/v0/datasources/${encodeURIComponent(datasourceName)}/delete` + // Tinybird expects URL-encoded form data, not JSON + const payload = `delete_condition=${encodeURIComponent(deleteCondition)}` + const result = await axios.post( url, - { delete_condition: deleteCondition }, + payload, { - headers: this.getHeaders('application/json'), + headers: this.getHeaders('application/x-www-form-urlencoded'), responseType: 'json', httpsAgent: TinybirdClient.httpsAgent, }, From 4a1f79d578ac6f65bb9745a084640d08ffbe6e50 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Tue, 18 Nov 2025 17:05:38 +0100 Subject: [PATCH 14/14] fix: enable repos locking while deleting --- ...cleanup-fork-activities-and-maintainers.ts | 58 +++++++++++++++++++ services/libs/database/src/tinybirdClient.ts | 14 ++--- 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts index fffea4bd7c..97d508f390 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-fork-activities-and-maintainers.ts @@ -119,6 +119,58 @@ async function lookupForkRepository(postgres: QueryExecutor, url: string): Promi return repo } +/** + * Lock a repository to prevent concurrent cleanup operations + */ +async function lockRepository( + postgres: QueryExecutor, + repoId: string, + repoUrl: string, + dryRun = false, +): Promise { + if (dryRun) { + log.info(`[DRY RUN] Would lock repository: ${repoUrl}`) + return + } + + log.info(`Locking repository: ${repoUrl}`) + + const query = ` + UPDATE git.repositories + SET "lockedAt" = NOW() + WHERE id = $(repoId) + ` + + await postgres.result(query, { repoId }) + log.info(`✓ Repository locked: ${repoUrl}`) +} + +/** + * Unlock a repository after cleanup is complete + */ +async function unlockRepository( + postgres: QueryExecutor, + repoId: string, + repoUrl: string, + dryRun = false, +): Promise { + if (dryRun) { + log.info(`[DRY RUN] Would unlock repository: ${repoUrl}`) + return + } + + log.info(`Unlocking repository: ${repoUrl}`) + + const query = ` + UPDATE git.repositories + SET "lockedAt" = NULL + WHERE id = $(repoId) + ` + + await postgres.result(query, { repoId }) + log.info(`✓ Repository unlocked: ${repoUrl}`) +} + /** * Delete maintainers for a fork repository from Postgres */ @@ -339,6 +391,9 @@ async function cleanupForkRepository( log.info(`${'='.repeat(80)}`) } + // Lock repository to prevent concurrent operations + await lockRepository(clients.postgres, repo.id, repo.url, dryRun) + try { // Initialize Tinybird client once for this repository const tinybird = new TinybirdClient(tbToken) @@ -421,6 +476,9 @@ async function cleanupForkRepository( } catch (error) { log.error(`Failed to cleanup repository ${repo.url}: ${error.message}`) throw error + } finally { + // Always unlock repository, even if cleanup failed + await unlockRepository(clients.postgres, repo.id, repo.url, dryRun) } } diff --git a/services/libs/database/src/tinybirdClient.ts b/services/libs/database/src/tinybirdClient.ts index 76a08f6601..028707b05a 100644 --- a/services/libs/database/src/tinybirdClient.ts +++ b/services/libs/database/src/tinybirdClient.ts @@ -180,15 +180,11 @@ export class TinybirdClient { // Tinybird expects URL-encoded form data, not JSON const payload = `delete_condition=${encodeURIComponent(deleteCondition)}` - const result = await axios.post( - url, - payload, - { - headers: this.getHeaders('application/x-www-form-urlencoded'), - responseType: 'json', - httpsAgent: TinybirdClient.httpsAgent, - }, - ) + const result = await axios.post(url, payload, { + headers: this.getHeaders('application/x-www-form-urlencoded'), + responseType: 'json', + httpsAgent: TinybirdClient.httpsAgent, + }) return result.data }