Skip to content

Commit 6137338

Browse files
authored
feat(streams): make v2 streams the default when using 4.1.0+ if they are supported (#2677)
1 parent 8cec3b7 commit 6137338

File tree

12 files changed

+57
-24
lines changed

12 files changed

+57
-24
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,6 +1236,7 @@ const EnvironmentSchema = z
12361236
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
12371237
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
12381238
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
1239+
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
12391240
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),
12401241
})
12411242
.and(GithubAppEnvSchema)

apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { authenticateApiRequest } from "~/services/apiAuth.server";
99
import { logger } from "~/services/logger.server";
1010
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
1111
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
12+
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
1213

1314
const ParamsSchema = z.object({
1415
taskId: z.string(),
@@ -40,6 +41,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
4041
"trigger-version": triggerVersion,
4142
"x-trigger-span-parent-as-link": spanParentAsLink,
4243
"x-trigger-worker": isFromWorker,
44+
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
4345
traceparent,
4446
tracestate,
4547
} = headers.data;
@@ -100,6 +102,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
100102
triggerVersion: triggerVersion ?? undefined,
101103
traceContext,
102104
spanParentAsLink: spanParentAsLink === 1,
105+
realtimeStreamsVersion: determineRealtimeStreamsVersion(realtimeStreamsVersion ?? undefined),
103106
});
104107

105108
if (!result) {

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { prisma } from "~/db.server";
1111
import { env } from "~/env.server";
1212
import { ApiAuthenticationResultSuccess, getOneTimeUseToken } from "~/services/apiAuth.server";
1313
import { logger } from "~/services/logger.server";
14+
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
1415
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1516
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
1617
import {
@@ -126,7 +127,9 @@ const { action, loader } = createActionApiRoute(
126127
traceContext,
127128
spanParentAsLink: spanParentAsLink === 1,
128129
oneTimeUseToken,
129-
realtimeStreamsVersion: realtimeStreamsVersion ?? undefined,
130+
realtimeStreamsVersion: determineRealtimeStreamsVersion(
131+
realtimeStreamsVersion ?? undefined
132+
),
130133
},
131134
engineVersion ?? undefined
132135
);

apps/webapp/app/routes/api.v1.tasks.batch.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
} from "~/v3/services/batchTriggerV3.server";
1717
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
1818
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
19+
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
1920

2021
const { action, loader } = createActionApiRoute(
2122
{
@@ -69,6 +70,7 @@ const { action, loader } = createActionApiRoute(
6970
"x-trigger-client": triggerClient,
7071
"x-trigger-engine-version": engineVersion,
7172
"batch-processing-strategy": batchProcessingStrategy,
73+
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
7274
traceparent,
7375
tracestate,
7476
} = headers;
@@ -107,6 +109,9 @@ const { action, loader } = createActionApiRoute(
107109
traceContext,
108110
spanParentAsLink: spanParentAsLink === 1,
109111
oneTimeUseToken,
112+
realtimeStreamsVersion: determineRealtimeStreamsVersion(
113+
realtimeStreamsVersion ?? undefined
114+
),
110115
});
111116

112117
const $responseHeaders = await responseHeaders(

apps/webapp/app/routes/api.v2.tasks.batch.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { ServiceValidationError } from "~/v3/services/baseService.server";
1818
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
1919
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
2020
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
21+
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
2122

2223
const { action, loader } = createActionApiRoute(
2324
{
@@ -59,6 +60,7 @@ const { action, loader } = createActionApiRoute(
5960
"x-trigger-engine-version": engineVersion,
6061
"batch-processing-strategy": batchProcessingStrategy,
6162
"x-trigger-request-idempotency-key": requestIdempotencyKey,
63+
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
6264
traceparent,
6365
tracestate,
6466
} = headers;
@@ -119,6 +121,9 @@ const { action, loader } = createActionApiRoute(
119121
traceContext,
120122
spanParentAsLink: spanParentAsLink === 1,
121123
oneTimeUseToken,
124+
realtimeStreamsVersion: determineRealtimeStreamsVersion(
125+
realtimeStreamsVersion ?? undefined
126+
),
122127
});
123128

124129
const $responseHeaders = await responseHeaders(

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export type BatchTriggerTaskServiceOptions = {
4747
traceContext?: Record<string, string | undefined | Record<string, string | undefined>>;
4848
spanParentAsLink?: boolean;
4949
oneTimeUseToken?: string;
50+
realtimeStreamsVersion?: "v1" | "v2";
5051
};
5152

5253
/**
@@ -708,6 +709,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
708709
batchIndex: currentIndex,
709710
skipChecks: true, // Skip entitlement and queue checks since we already validated at batch/chunk level
710711
planType, // Pass planType from batch-level entitlement check
712+
realtimeStreamsVersion: options?.realtimeStreamsVersion,
711713
},
712714
"V2"
713715
);

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,22 @@ export function getRealtimeStreamInstance(
6060
}
6161
}
6262

63+
export function determineRealtimeStreamsVersion(streamVersion?: string): "v1" | "v2" {
64+
if (!streamVersion) {
65+
return env.REALTIME_STREAMS_DEFAULT_VERSION;
66+
}
67+
68+
if (
69+
streamVersion === "v2" &&
70+
env.REALTIME_STREAMS_S2_BASIN &&
71+
env.REALTIME_STREAMS_S2_ACCESS_TOKEN
72+
) {
73+
return "v2";
74+
}
75+
76+
return "v1";
77+
}
78+
6379
const s2RealtimeStreamsCache = singleton(
6480
"s2RealtimeStreamsCache",
6581
initializeS2RealtimeStreamsCache

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ export type BatchTriggerTaskServiceOptions = {
5656
traceContext?: Record<string, string | undefined>;
5757
spanParentAsLink?: boolean;
5858
oneTimeUseToken?: string;
59+
realtimeStreamsVersion?: "v1" | "v2";
5960
};
6061

6162
type RunItemData = {
@@ -851,6 +852,7 @@ export class BatchTriggerV3Service extends BaseService {
851852
batchId: batch.friendlyId,
852853
skipChecks: true,
853854
runFriendlyId: task.runId,
855+
realtimeStreamsVersion: options?.realtimeStreamsVersion,
854856
}
855857
);
856858

apps/webapp/app/v3/services/replayTaskRun.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { BaseService } from "./baseService.server";
1111
import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server";
1212
import { type RunOptionsData } from "../testTask";
1313
import { replaceSuperJsonPayload } from "@trigger.dev/core/v3/utils/ioSerialization";
14+
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
1415

1516
type OverrideOptions = {
1617
environmentId?: string;
@@ -118,7 +119,9 @@ export class ReplayTaskRunService extends BaseService {
118119
traceContext: {
119120
traceparent: `00-${existingTaskRun.traceId}-${existingTaskRun.spanId}-01`,
120121
},
121-
realtimeStreamsVersion: existingTaskRun.realtimeStreamsVersion,
122+
realtimeStreamsVersion: determineRealtimeStreamsVersion(
123+
existingTaskRun.realtimeStreamsVersion
124+
),
122125
}
123126
);
124127

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export type TriggerTaskServiceOptions = {
3333
overrideCreatedAt?: Date;
3434
replayedFromTaskRunFriendlyId?: string;
3535
planType?: string;
36-
realtimeStreamsVersion?: string;
36+
realtimeStreamsVersion?: "v1" | "v2";
3737
};
3838

3939
export class OutOfEntitlementError extends Error {

0 commit comments

Comments
 (0)