Skip to content

Commit 1b6a39d

Browse files
committed
chore: Formatting
1 parent d744ab3 commit 1b6a39d

File tree

5 files changed

+55
-26
lines changed

5 files changed

+55
-26
lines changed

control-plane/src/modules/integrations/langfuse.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const langfuseCache = new NodeCache({
1717
});
1818

1919
const integrationsCache = createCache<z.infer<typeof integrationSchema>>(
20-
Symbol("langfuseIntegrations")
20+
Symbol("langfuseIntegrations"),
2121
);
2222

2323
export async function getLangfuseClient(clusterId: string) {
@@ -55,7 +55,7 @@ export async function getLangfuseClient(clusterId: string) {
5555
client: langfuse,
5656
sendMessagePayloads: integrations.langfuse.sendMessagePayloads,
5757
},
58-
60
58+
60,
5959
); // Cache for 1 minute
6060

6161
return {
@@ -74,7 +74,9 @@ export async function flushCluster(clusterId: string) {
7474
}
7575
}
7676

77-
export async function processModelCall(event: z.infer<typeof modelCallEventSchema>) {
77+
export async function processModelCall(
78+
event: z.infer<typeof modelCallEventSchema>,
79+
) {
7880
const langfuse = await getLangfuseClient(event.clusterId);
7981

8082
const trace = langfuse?.client.trace({
@@ -99,7 +101,9 @@ export async function processModelCall(event: z.infer<typeof modelCallEventSchem
99101
});
100102
}
101103

102-
export async function processRunFeedback(event: z.infer<typeof runFeedbackEventSchema>) {
104+
export async function processRunFeedback(
105+
event: z.infer<typeof runFeedbackEventSchema>,
106+
) {
103107
const langfuse = await getLangfuseClient(event.clusterId);
104108

105109
langfuse?.client.score({
@@ -110,7 +114,9 @@ export async function processRunFeedback(event: z.infer<typeof runFeedbackEventS
110114
});
111115
}
112116

113-
export async function processToolCall(event: z.infer<typeof toolCallEventSchema>) {
117+
export async function processToolCall(
118+
event: z.infer<typeof toolCallEventSchema>,
119+
) {
114120
const langfuse = await getLangfuseClient(event.clusterId);
115121

116122
langfuse?.client.span({
@@ -133,9 +139,12 @@ export const start = () => {
133139
} else if (data.type === "toolCall") {
134140
await processToolCall(data);
135141
} else {
136-
logger.error("Received customer telemetry message that does not conform to expected schema", {
137-
message: data,
138-
});
142+
logger.error(
143+
"Received customer telemetry message that does not conform to expected schema",
144+
{
145+
message: data,
146+
},
147+
);
139148
}
140149

141150
await flushCluster(data.clusterId);

control-plane/src/modules/integrations/slack/receiver.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ import {
88
Logger,
99
LogLevel,
1010
} from "@slack/bolt";
11-
import { FastifyInstance, FastifyPluginCallback, FastifyReply, FastifyRequest } from "fastify";
11+
import {
12+
FastifyInstance,
13+
FastifyPluginCallback,
14+
FastifyReply,
15+
FastifyRequest,
16+
} from "fastify";
1217
import { logger } from "../../observability/logger";
1318

1419
const slackLogger: Logger = {
@@ -48,16 +53,21 @@ export class FastifySlackReceiver implements Receiver {
4853

4954
// Register a seperate plugin and disable the content type parsers for the route
5055
const slackPlugin: FastifyPluginCallback = async instance => {
51-
const contentTypes = ["application/json", "application/x-www-form-urlencoded"];
56+
const contentTypes = [
57+
"application/json",
58+
"application/x-www-form-urlencoded",
59+
];
5260

5361
instance.removeContentTypeParser(contentTypes);
5462
instance.addContentTypeParser(
5563
contentTypes,
5664
{ parseAs: "string" },
57-
instance.defaultTextParser
65+
instance.defaultTextParser,
5866
);
5967

60-
instance.post("", (request, reply) => this.requestHandler(request, reply));
68+
instance.post("", (request, reply) =>
69+
this.requestHandler(request, reply),
70+
);
6171
};
6272

6373
await this.fastify.register(slackPlugin, { prefix: this.path });
@@ -93,7 +103,7 @@ export class FastifySlackReceiver implements Receiver {
93103
enabled: true,
94104
signingSecret: this.signingSecret,
95105
},
96-
req
106+
req,
97107
);
98108
} catch (error) {
99109
logger.warn("Failed to parse and verify Slack request", {

control-plane/src/modules/jobs/self-heal-jobs.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ export async function selfHealJobs() {
1818
eq(data.jobs.status, "running"),
1919
lt(
2020
data.jobs.last_retrieved_at,
21-
sql`now() - interval '1 second' * timeout_interval_seconds`
21+
sql`now() - interval '1 second' * timeout_interval_seconds`,
2222
),
2323
// only timeout jobs that have a timeout set
2424
isNotNull(data.jobs.timeout_interval_seconds),
2525
// Don't time out jobs that have pending approval requests
26-
or(eq(data.jobs.approval_requested, false), isNotNull(data.jobs.approved))
27-
)
26+
or(
27+
eq(data.jobs.approval_requested, false),
28+
isNotNull(data.jobs.approved),
29+
),
30+
),
2831
)
2932
.returning({
3033
id: data.jobs.id,
@@ -53,7 +56,7 @@ export async function selfHealJobs() {
5356
.set({
5457
status: "stalled",
5558
// Don't subtrack this type of retry from the remaining attempts count
56-
remaining_attempts: sql`remaining_attempts + 1`
59+
remaining_attempts: sql`remaining_attempts + 1`,
5760
})
5861
.where(
5962
and(
@@ -62,10 +65,10 @@ export async function selfHealJobs() {
6265
lt(
6366
data.jobs.updated_at,
6467
// Find any jobs that have been interrupted for more than 5 minutes
65-
sql`now() - interval '5 minutes'`
68+
sql`now() - interval '5 minutes'`,
6669
),
6770
eq(data.jobs.approval_requested, false),
68-
)
71+
),
6972
)
7073
.returning({
7174
id: data.jobs.id,
@@ -76,9 +79,9 @@ export async function selfHealJobs() {
7679
if (nonResumedInterruptions.length > 0) {
7780
logger.warn("Found interrupted jobs that have not been resumed", {
7881
count: nonResumedInterruptions.length,
79-
jobs: nonResumedInterruptions.map(row => row.id).join(", ")
82+
jobs: nonResumedInterruptions.map(row => row.id).join(", "),
8083
});
81-
};
84+
}
8285

8386
const stalledJobs = await data.db
8487
.update(data.jobs)
@@ -125,7 +128,9 @@ export async function selfHealJobs() {
125128

126129
return {
127130
stalledFailedByTimeout: stalledByTimeout.map(row => row.id),
128-
stalledRecovered: stalledJobs.filter(row => row.status === "pending").map(row => row.id),
131+
stalledRecovered: stalledJobs
132+
.filter(row => row.status === "pending")
133+
.map(row => row.id),
129134
nonResumedInterruptions: nonResumedInterruptions.map(row => row.id),
130135
};
131136
}

control-plane/src/modules/queues/observability.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ export const baseMessageSchema = z
1818
export type BaseMessage = z.infer<typeof baseMessageSchema>;
1919

2020
export const withObservability =
21-
<T>(queueName: string, fn: (message: T) => Promise<void>): Processor<T> => async (job: Job<T>) => {
21+
<T>(queueName: string, fn: (message: T) => Promise<void>): Processor<T> =>
22+
async (job: Job<T>) => {
2223
const zodResult = baseMessageSchema.safeParse(job.data);
2324

2425
if (!zodResult.success) {
@@ -37,7 +38,7 @@ export const withObservability =
3738
"run.id": parsed.runId,
3839
"queue.name": queueName,
3940
"queue.job.id": job.id,
40-
"queue.job.attemptsMade": job.attemptsMade
41+
"queue.job.attemptsMade": job.attemptsMade,
4142
};
4243

4344
// Existing trace context propogated via the job data
@@ -53,7 +54,7 @@ export const withObservability =
5354
{
5455
attributes,
5556
},
56-
existingTraceContext
57+
existingTraceContext,
5758
);
5859
} catch (e) {
5960
if (isRetryableError(e)) {

control-plane/src/modules/router/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1713,7 +1713,11 @@ export const router = initServer().router(contract, {
17131713
};
17141714
}
17151715

1716-
await kv.setIfNotExists(clusterId, messageKey, JSON.stringify(result.structured));
1716+
await kv.setIfNotExists(
1717+
clusterId,
1718+
messageKey,
1719+
JSON.stringify(result.structured),
1720+
);
17171721

17181722
return {
17191723
status: 200,

0 commit comments

Comments
 (0)