Skip to content

Commit 6e1db36

Browse files
authored
fix: members-enrichment workflow timeouts and db savepoint errors (#3585)
1 parent 6814492 commit 6e1db36

File tree

5 files changed

+55
-67
lines changed

5 files changed

+55
-67
lines changed

services/apps/members_enrichment_worker/src/activities/enrichment.ts

Lines changed: 34 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -249,22 +249,19 @@ export async function updateMemberUsingSquashedPayload(
249249
return await svc.postgres.writer.transactionally(async (tx) => {
250250
let updated = false
251251
const qx = dbStoreQx(tx)
252-
const promises = []
253252

254253
// process identities
255254
if (squashedPayload.identities.length > 0) {
256255
svc.log.debug({ memberId }, 'Adding to member identities!')
257256
for (const i of squashedPayload.identities) {
258257
updated = true
259-
promises.push(
260-
upsertMemberIdentity(qx, {
261-
memberId,
262-
platform: i.platform,
263-
type: i.type,
264-
value: i.value,
265-
verified: i.verified,
266-
}),
267-
)
258+
await upsertMemberIdentity(qx, {
259+
memberId,
260+
platform: i.platform,
261+
type: i.type,
262+
value: i.value,
263+
verified: i.verified,
264+
})
268265
}
269266
}
270267

@@ -273,30 +270,22 @@ export async function updateMemberUsingSquashedPayload(
273270
// it's ommited from the payload because it takes a lot of space
274271
svc.log.debug('Processing contributions! ', { memberId, hasContributions })
275272
if (hasContributions) {
276-
promises.push(
277-
findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId)
278-
.then((caches) => {
279-
if (caches.length > 0 && caches[0].data) {
280-
const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
281-
MemberEnrichmentSource.PROGAI,
282-
svc.log,
283-
)
284-
return progaiService.normalize(caches[0].data)
285-
}
286-
287-
return undefined
288-
})
289-
.then((normalized) => {
290-
if (normalized) {
291-
const typed = normalized as IMemberEnrichmentDataNormalized
273+
const caches = await findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId)
274+
if (caches?.length > 0 && caches[0]?.data) {
275+
const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
276+
MemberEnrichmentSource.PROGAI,
277+
svc.log,
278+
)
279+
const normalized = progaiService.normalize(caches[0].data)
280+
if (normalized) {
281+
const typed = normalized as IMemberEnrichmentDataNormalized
292282

293-
if (typed.contributions) {
294-
updated = true
295-
return updateMemberContributions(qx, memberId, typed.contributions)
296-
}
297-
}
298-
}),
299-
)
283+
if (typed.contributions) {
284+
updated = true
285+
await updateMemberContributions(qx, memberId, typed.contributions)
286+
}
287+
}
288+
}
300289
}
301290

302291
// process attributes
@@ -312,7 +301,7 @@ export async function updateMemberUsingSquashedPayload(
312301
attributes = await setAttributesDefaultValues(attributes, priorities)
313302
}
314303
updated = true
315-
promises.push(updateMemberAttributes(qx, memberId, attributes))
304+
await updateMemberAttributes(qx, memberId, attributes)
316305
}
317306

318307
// process reach
@@ -332,7 +321,7 @@ export async function updateMemberUsingSquashedPayload(
332321
}
333322

334323
updated = true
335-
promises.push(updateMemberReach(qx, memberId, reach))
324+
await updateMemberReach(qx, memberId, reach)
336325
}
337326
}
338327

@@ -422,7 +411,7 @@ export async function updateMemberUsingSquashedPayload(
422411
if (results.toDelete.length > 0) {
423412
for (const org of results.toDelete) {
424413
updated = true
425-
promises.push(deleteMemberOrgById(tx.transaction(), memberId, org.id))
414+
await deleteMemberOrgById(tx.transaction(), memberId, org.id)
426415
}
427416
}
428417

@@ -432,30 +421,26 @@ export async function updateMemberUsingSquashedPayload(
432421
throw new Error('Organization ID is missing!')
433422
}
434423
updated = true
435-
promises.push(
436-
insertWorkExperience(
437-
tx.transaction(),
438-
memberId,
439-
org.organizationId,
440-
org.title,
441-
org.startDate,
442-
org.endDate,
443-
org.source,
444-
),
424+
await insertWorkExperience(
425+
tx.transaction(),
426+
memberId,
427+
org.organizationId,
428+
org.title,
429+
org.startDate,
430+
org.endDate,
431+
org.source,
445432
)
446433
}
447434
}
448435

449436
if (results.toUpdate.size > 0) {
450437
for (const [org, toUpdate] of results.toUpdate) {
451438
updated = true
452-
promises.push(updateMemberOrg(tx.transaction(), memberId, org, toUpdate))
439+
await updateMemberOrg(tx.transaction(), memberId, org, toUpdate)
453440
}
454441
}
455442
}
456443

457-
await Promise.all(promises)
458-
459444
if (updated) {
460445
await setMemberEnrichmentUpdatedAt(tx.transaction(), memberId)
461446
await syncMember(memberId)

services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export const scheduleMembersEnrichment = async () => {
2424
type: 'startWorkflow',
2525
workflowType: getMembersToEnrich,
2626
taskQueue: 'members-enrichment',
27-
workflowExecutionTimeout: '20 minutes',
27+
workflowExecutionTimeout: '2 hours',
2828
retry: {
2929
initialInterval: '15 seconds',
3030
backoffCoefficient: 2,

services/apps/members_enrichment_worker/src/workflows/enrichMember.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const {
2424
hasRemainingCredits,
2525
getMemberById,
2626
} = proxyActivities<typeof activities>({
27-
startToCloseTimeout: '5 minutes',
27+
startToCloseTimeout: '10 minutes',
2828
retry: {
2929
initialInterval: '60s',
3030
backoffCoefficient: 2.0,
@@ -82,7 +82,7 @@ export async function enrichMember(
8282
workflowId: 'member-enrichment/' + input.id + '/processMemberSources',
8383
cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
8484
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
85-
workflowExecutionTimeout: '15 minutes',
85+
workflowExecutionTimeout: '25 minutes',
8686
retry: {
8787
backoffCoefficient: 2,
8888
maximumAttempts: 10,

services/apps/members_enrichment_worker/src/workflows/getMembersToEnrich.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { chunkArray } from '../utils/common'
1414
import { enrichMember } from './enrichMember'
1515

1616
const { getEnrichableMembers, getMaxConcurrentRequests } = proxyActivities<typeof activities>({
17-
startToCloseTimeout: '10 minutes',
17+
startToCloseTimeout: '15 minutes',
1818
})
1919

2020
export async function getMembersToEnrich(): Promise<void> {
@@ -46,7 +46,7 @@ export async function getMembersToEnrich(): Promise<void> {
4646
workflowId: 'member-enrichment/' + member.id,
4747
cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
4848
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
49-
workflowExecutionTimeout: '15 minutes',
49+
workflowExecutionTimeout: '30 minutes',
5050
retry: {
5151
backoffCoefficient: 2,
5252
maximumAttempts: 10,

services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -469,25 +469,28 @@ export async function deleteMemberOrg(db: DbConnOrTx, memberId: string, organiza
469469
})
470470
}
471471

472-
export async function deleteMemberOrgById(db: DbConnOrTx, memberId: string, id: string) {
473-
await db.tx(async (tx) => {
474-
await tx.none(
475-
`
472+
export async function deleteMemberOrgById(
473+
tx: DbTransaction,
474+
memberId: string,
475+
id: string,
476+
): Promise<void> {
477+
// Execute directly on the provided transaction to avoid creating nested savepoints
478+
await tx.none(
479+
`
476480
DELETE FROM "memberOrganizationAffiliationOverrides"
477481
WHERE "memberOrganizationId" = $(id);
478-
`,
479-
{ id },
480-
)
482+
`,
483+
{ id },
484+
)
481485

482-
await tx.none(
483-
`
486+
await tx.none(
487+
`
484488
UPDATE "memberOrganizations"
485489
SET "deletedAt" = NOW()
486490
WHERE "memberId" = $(memberId) and id = $(id);
487-
`,
488-
{ memberId, id },
489-
)
490-
})
491+
`,
492+
{ memberId, id },
493+
)
491494
}
492495

493496
export async function findMemberOrgs(db: DbStore, memberId: string, orgId: string) {

0 commit comments

Comments
 (0)