Skip to content

Commit c635826

Browse files
authored
minor: move latte to its own worker and queue (#1714)
1 parent 426a2e8 commit c635826

File tree

13 files changed

+70
-36
lines changed

13 files changed

+70
-36
lines changed

apps/workers/src/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ const gracefulShutdown = async (signal: string) => {
9494
process.on('SIGINT', () => gracefulShutdown('SIGINT'))
9595
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))
9696

97-
process.on('uncaughtException', function(err) {
97+
process.on('uncaughtException', function (err) {
9898
captureException(err)
9999
})
100100

apps/workers/src/workers/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
startEventHandlersWorker,
77
startEventsWorker,
88
} from './worker-definitions/eventsWorker'
9+
import { startLatteWorker } from './worker-definitions/latteWorker'
910
import { startMaintenanceWorker } from './worker-definitions/maintenanceWorker'
1011
import { startTracingWorker } from './worker-definitions/tracingWorker'
1112
import { startWebhooksWorker } from './worker-definitions/webhooksWorker'
@@ -15,6 +16,7 @@ export async function startWorkers() {
1516
const evaluationsWorker = startEvaluationsWorker()
1617
const eventsWorker = startEventsWorker()
1718
const eventHandlersWorker = startEventHandlersWorker()
19+
const latteWorker = startLatteWorker()
1820
const maintenanceWorker = startMaintenanceWorker()
1921
const webhooksWorker = startWebhooksWorker()
2022
const documentsWorker = startDocumentsWorker()
@@ -26,6 +28,7 @@ export async function startWorkers() {
2628
evaluationsWorker,
2729
eventsWorker,
2830
eventHandlersWorker,
31+
latteWorker,
2932
maintenanceWorker,
3033
webhooksWorker,
3134
documentsWorker,

apps/workers/src/workers/worker-definitions/documentsWorker.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { WORKER_CONNECTION_CONFIG } from '../utils/connectionConfig'
66
const jobMappings = {
77
runDocumentJob: jobs.runDocumentJob,
88
runDocumentForExperimentJob: jobs.runDocumentForExperimentJob,
9-
runLatteJob: jobs.runLatteJob,
109
runDocumentTriggerEventJob: jobs.runDocumentTriggerEventJob,
1110
}
1211

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { Queues } from '@latitude-data/core/queues/types'
2+
import * as jobs from '@latitude-data/core/jobs/definitions'
3+
import { createWorker } from '../utils/createWorker'
4+
import { WORKER_CONNECTION_CONFIG } from '../utils/connectionConfig'
5+
6+
const jobMappings = {
7+
runLatteJob: jobs.runLatteJob,
8+
}
9+
10+
export function startLatteWorker() {
11+
return createWorker(Queues.latteQueue, jobMappings, {
12+
concurrency: 25,
13+
connection: WORKER_CONNECTION_CONFIG,
14+
})
15+
}

apps/workers/src/workers/worker-definitions/maintenanceWorker.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ const jobMappings = {
1919

2020
// Migrate provider logs to object storage
2121
scheduleProviderLogsMigrationJobs: jobs.scheduleProviderLogsMigrationJobs,
22-
migrateProviderLogsToObjectStorageJob: jobs.migrateProviderLogsToObjectStorageJob,
22+
migrateProviderLogsToObjectStorageJob:
23+
jobs.migrateProviderLogsToObjectStorageJob,
2324

2425
// Migrate document logs workspace ids
2526
scheduleWorkspaceLogsMigrationJobs: jobs.scheduleWorkspaceLogsMigrationJobs,

packages/core/src/jobs/job-definitions/copilot/chat.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ export const runLatteJob = async (job: Job<RunLatteJobData>) => {
118118
abortSignal: controller.signal,
119119
debugVersionUuid,
120120
})
121+
121122
if (!runResult.ok) {
122123
await emitError({
123124
workspaceId,

packages/core/src/jobs/queues/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ let _queues:
1414
maintenanceQueue: Queue
1515
tracingQueue: Queue
1616
webhooksQueue: Queue
17+
latteQueue: Queue
1718
}
1819
| undefined
1920

@@ -33,8 +34,8 @@ export async function queues() {
3334
type: 'exponential',
3435
delay: 1000,
3536
},
36-
removeOnFail: true,
37-
removeOnComplete: true,
37+
removeOnFail: 100,
38+
removeOnComplete: 100,
3839
},
3940
}
4041

@@ -48,6 +49,7 @@ export async function queues() {
4849
maintenanceQueue: new Queue(Queues.maintenanceQueue, options),
4950
tracingQueue: new Queue(Queues.tracingQueue, options),
5051
webhooksQueue: new Queue(Queues.webhooksQueue, options),
52+
latteQueue: new Queue(Queues.latteQueue, options),
5153
}
5254

5355
return _queues

packages/core/src/jobs/queues/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ export enum Queues {
88
documentsQueue = 'documentsQueue',
99
documentSuggestionsQueue = 'documentSuggestionsQueue',
1010
tracingQueue = 'tracing',
11+
latteQueue = 'latteQueue',
1112
}

packages/core/src/repositories/documentLogsRepository/index.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@ export class DocumentLogsRepository extends Repository<DocumentLog> {
3131
commits,
3232
and(isNull(commits.deletedAt), eq(commits.id, documentLogs.commitId)),
3333
)
34-
.innerJoin(
35-
projects,
36-
eq(projects.id, commits.projectId),
37-
)
34+
.innerJoin(projects, eq(projects.id, commits.projectId))
3835
.leftJoin(
3936
runErrors,
4037
and(
@@ -51,17 +48,23 @@ export class DocumentLogsRepository extends Repository<DocumentLog> {
5148
return Result.error(new NotFoundError('DocumentLog not found'))
5249
}
5350

54-
const result = await this.scope.where(
55-
and(this.scopeFilter, eq(documentLogs.uuid, uuid)),
56-
)
57-
58-
if (!result.length) {
59-
return Result.error(
60-
new NotFoundError(`DocumentLog not found with uuid ${uuid}`),
51+
try {
52+
const result = await this.scope.where(
53+
and(this.scopeFilter, eq(documentLogs.uuid, uuid)),
6154
)
62-
}
6355

64-
return Result.ok(result[0]!)
56+
if (!result.length) {
57+
return Result.error(
58+
new NotFoundError(`DocumentLog not found with uuid ${uuid}`),
59+
)
60+
}
61+
62+
return Result.ok(result[0]!)
63+
} catch (err) {
64+
console.log(err)
65+
66+
return Result.error(err as Error)
67+
}
6568
}
6669

6770
async totalCountSinceDate(minDate: Date) {

packages/core/src/schema/models/documentLogs.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ export const documentLogs = latitudeSchema.table(
2121
{
2222
id: bigserial('id', { mode: 'number' }).notNull().primaryKey(),
2323
uuid: uuid('uuid').notNull().unique(),
24-
workspaceId: bigint('workspace_id', { mode: 'number' }).references(() => workspaces.id, { onDelete: 'cascade' }),
24+
workspaceId: bigint('workspace_id', { mode: 'number' }).references(
25+
() => workspaces.id,
26+
{ onDelete: 'cascade' },
27+
),
2528
documentUuid: uuid('document_uuid').notNull(),
2629
commitId: bigint('commit_id', { mode: 'number' })
2730
.notNull()

0 commit comments

Comments
 (0)