Skip to content

Commit 26d4d08

Browse files
authored
chore: move all placement tag helpers to core (#2403)
* chore: move all placement tag helpers to core * move placement tag utils into server-only
1 parent 1a88809 commit 26d4d08

File tree

5 files changed

+71
-50
lines changed

5 files changed

+71
-50
lines changed

apps/supervisor/src/workloadManager/kubernetes.ts

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
type WorkloadManagerOptions,
66
} from "./types.js";
77
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
8+
import { PlacementTagProcessor } from "@trigger.dev/core/v3/serverOnly";
89
import { env } from "../env.js";
910
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
1011
import { getRunnerId } from "../util.js";
@@ -13,18 +14,18 @@ type ResourceQuantities = {
1314
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
1415
};
1516

16-
interface PlacementConfig {
17-
enabled: boolean;
18-
prefix: string;
19-
}
20-
2117
export class KubernetesWorkloadManager implements WorkloadManager {
2218
private readonly logger = new SimpleStructuredLogger("kubernetes-workload-provider");
2319
private k8s: K8sApi;
2420
private namespace = env.KUBERNETES_NAMESPACE;
21+
private placementTagProcessor: PlacementTagProcessor;
2522

2623
constructor(private opts: WorkloadManagerOptions) {
2724
this.k8s = createK8sApi();
25+
this.placementTagProcessor = new PlacementTagProcessor({
26+
enabled: env.PLACEMENT_TAGS_ENABLED,
27+
prefix: env.PLACEMENT_TAGS_PREFIX,
28+
});
2829

2930
if (opts.workloadApiDomain) {
3031
this.logger.warn("[KubernetesWorkloadManager] ⚠️ Custom workload API domain", {
@@ -33,56 +34,21 @@ export class KubernetesWorkloadManager implements WorkloadManager {
3334
}
3435
}
3536

36-
private get placementConfig(): PlacementConfig {
37-
return {
38-
enabled: env.PLACEMENT_TAGS_ENABLED,
39-
prefix: env.PLACEMENT_TAGS_PREFIX,
40-
};
41-
}
42-
4337
private addPlacementTags(
4438
podSpec: Omit<k8s.V1PodSpec, "containers">,
4539
placementTags?: PlacementTag[]
4640
): Omit<k8s.V1PodSpec, "containers"> {
47-
if (!this.placementConfig.enabled || !placementTags || placementTags.length === 0) {
48-
return podSpec;
49-
}
50-
51-
const nodeSelector: Record<string, string> = { ...podSpec.nodeSelector };
52-
53-
// Convert placement tags to nodeSelector labels
54-
for (const tag of placementTags) {
55-
const labelKey = `${this.placementConfig.prefix}/${tag.key}`;
56-
57-
// Print warnings (if any)
58-
this.printTagWarnings(tag);
59-
60-
// For now we only support single values via nodeSelector
61-
nodeSelector[labelKey] = tag.values?.[0] ?? "";
62-
}
41+
const nodeSelector = this.placementTagProcessor.convertToNodeSelector(
42+
placementTags,
43+
podSpec.nodeSelector
44+
);
6345

6446
return {
6547
...podSpec,
6648
nodeSelector,
6749
};
6850
}
6951

70-
private printTagWarnings(tag: PlacementTag) {
71-
if (!tag.values || tag.values.length === 0) {
72-
// No values provided
73-
this.logger.warn(
74-
"[KubernetesWorkloadManager] Placement tag has no values, using empty string",
75-
tag
76-
);
77-
} else if (tag.values.length > 1) {
78-
// Multiple values provided
79-
this.logger.warn(
80-
"[KubernetesWorkloadManager] Placement tag has multiple values, only using first one",
81-
tag
82-
);
83-
}
84-
}
85-
8652
async create(opts: WorkloadManagerCreateOptions) {
8753
this.logger.log("[KubernetesWorkloadManager] Creating container", { opts });
8854

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import type { BillingCache } from "../billingCache.js";
22
import { startSpan } from "@internal/tracing";
33
import { assertExhaustive } from "@trigger.dev/core";
4-
import { DequeuedMessage, RetryOptions, placementTag } from "@trigger.dev/core/v3";
4+
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
5+
import { placementTag } from "@trigger.dev/core/v3/serverOnly";
56
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
67
import { PrismaClientOrTransaction } from "@trigger.dev/database";
78
import { getRunWithBackgroundWorkerTasks } from "../db/worker.js";

packages/core/src/v3/schemas/runEngine.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,6 @@ export const PlacementTag = z.object({
230230
});
231231
export type PlacementTag = z.infer<typeof PlacementTag>;
232232

233-
/** Helper functions for placement tags. In the future this will be able to support multiple values and operators. For now it's just a single value. */
234-
export function placementTag(key: string, value: string): PlacementTag {
235-
return { key, values: [value] };
236-
}
237-
238233
/** This is sent to a Worker when a run is dequeued (a new run or continuing run) */
239234
export const DequeuedMessage = z.object({
240235
version: z.literal("1"),

packages/core/src/v3/serverOnly/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ export * from "./shutdownManager.js";
66
export * from "./k8s.js";
77
export * from "./jumpHash.js";
88
export * from "../apiClient/version.js";
9+
export * from "./placementTags.js";
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { type PlacementTag } from "../schemas/index.js";
2+
import { SimpleStructuredLogger } from "../utils/structuredLogger.js";
3+
4+
export interface PlacementConfig {
5+
enabled: boolean;
6+
prefix: string;
7+
}
8+
9+
export class PlacementTagProcessor {
10+
private readonly logger = new SimpleStructuredLogger("placement-tag-processor");
11+
12+
constructor(private readonly config: PlacementConfig) {}
13+
14+
/**
15+
* Converts placement tags to Kubernetes nodeSelector labels
16+
*/
17+
convertToNodeSelector(
18+
placementTags?: PlacementTag[],
19+
existingNodeSelector?: Record<string, string>
20+
): Record<string, string> {
21+
if (!this.config.enabled || !placementTags || placementTags.length === 0) {
22+
return existingNodeSelector ?? {};
23+
}
24+
25+
const nodeSelector: Record<string, string> = { ...existingNodeSelector };
26+
27+
// Convert placement tags to nodeSelector labels
28+
for (const tag of placementTags) {
29+
const labelKey = `${this.config.prefix}/${tag.key}`;
30+
31+
// Print warnings (if any)
32+
this.printTagWarnings(tag);
33+
34+
// For now we only support single values via nodeSelector
35+
nodeSelector[labelKey] = tag.values?.[0] ?? "";
36+
}
37+
38+
return nodeSelector;
39+
}
40+
41+
private printTagWarnings(tag: PlacementTag) {
42+
if (!tag.values || tag.values.length === 0) {
43+
// No values provided
44+
this.logger.warn("Placement tag has no values, using empty string", tag);
45+
} else if (tag.values.length > 1) {
46+
// Multiple values provided
47+
this.logger.warn("Placement tag has multiple values, only using first one", tag);
48+
}
49+
}
50+
}
51+
52+
/**
53+
* Helper function to create a placement tag. In the future this will be able to support multiple values and operators.
54+
* For now it's just a single value.
55+
*/
56+
export function placementTag(key: string, value: string): PlacementTag {
57+
return { key, values: [value] };
58+
}

0 commit comments

Comments
 (0)