Skip to content

Commit 0ae72e2

Browse files
committed
short circuit
1 parent 6c8d4ad commit 0ae72e2

File tree

6 files changed

+35
-11
lines changed

6 files changed

+35
-11
lines changed

example/convex/_generated/api.d.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ export declare const components: {
4848
load: FunctionReference<
4949
"query",
5050
"internal",
51-
{ workflowId: string },
51+
{ shortCircuit?: boolean; workflowId: string },
5252
{
53+
blocked?: boolean;
5354
journalEntries: Array<{
5455
_creationTime: number;
5556
_id: string;

src/client/step.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
valueSize,
1919
} from "../component/schema.js";
2020
import type { SchedulerOptions, WorkflowComponent } from "./types.js";
21+
import { MAX_JOURNAL_SIZE } from "../shared.js";
2122

2223
export type WorkerResult =
2324
| { type: "handlerDone"; runResult: RunResult }
@@ -35,8 +36,6 @@ export type StepRequest = {
3536
reject: (error: unknown) => void;
3637
};
3738

38-
const MAX_JOURNAL_SIZE = 8 << 20;
39-
4039
export class StepExecutor {
4140
private journalEntrySize: number;
4241

@@ -56,6 +55,7 @@ export class StepExecutor {
5655
);
5756

5857
if (this.journalEntrySize > MAX_JOURNAL_SIZE) {
58+
// This should never happen, but we'll throw an error just in case.
5959
throw new Error(journalSizeError(this.journalEntrySize, this.workflowId));
6060
}
6161
}

src/client/workflowMutation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export function workflowMutation<ArgsValidator extends PropertyValidators>(
6969
const { workflowId, generationNumber } = args;
7070
const { workflow, logLevel, journalEntries, ok } = await ctx.runQuery(
7171
component.journal.load,
72-
{ workflowId },
72+
{ workflowId, shortCircuit: true },
7373
);
7474
const inProgress = journalEntries.filter(({ step }) => step.inProgress);
7575
const console = createLogger(logLevel);

src/component/_generated/api.d.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ export type Mounts = {
4242
load: FunctionReference<
4343
"query",
4444
"public",
45-
{ workflowId: string },
45+
{ shortCircuit?: boolean; workflowId: string },
4646
{
47+
blocked?: boolean;
4748
journalEntries: Array<{
4849
_creationTime: number;
4950
_id: string;

src/component/journal.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,53 @@ import { internal } from "./_generated/api.js";
1919
import { type FunctionHandle } from "convex/server";
2020
import { getDefaultLogger } from "./utils.js";
2121
import { assert } from "convex-helpers";
22+
import { MAX_JOURNAL_SIZE } from "../shared.js";
2223

2324
export const load = query({
2425
args: {
2526
workflowId: v.id("workflows"),
27+
shortCircuit: v.optional(v.boolean()),
2628
},
2729
returns: v.object({
2830
workflow: workflowDocument,
2931
journalEntries: v.array(journalDocument),
3032
ok: v.boolean(),
3133
logLevel,
34+
blocked: v.optional(v.boolean()),
3235
}),
33-
handler: async (ctx, { workflowId }) => {
36+
handler: async (ctx, { workflowId, shortCircuit }) => {
3437
const workflow = await ctx.db.get(workflowId);
3538
assert(workflow, `Workflow not found: ${workflowId}`);
3639
const { logLevel } = await getDefaultLogger(ctx);
3740
const journalEntries: JournalEntry[] = [];
38-
let sizeSoFar = 0;
41+
let journalSize = 0;
42+
if (shortCircuit) {
43+
const inProgress = await ctx.db
44+
.query("steps")
45+
.withIndex("inProgress", (q) =>
46+
q.eq("step.inProgress", true).eq("workflowId", workflowId),
47+
)
48+
.first();
49+
if (inProgress) {
50+
return {
51+
journalEntries: [inProgress],
52+
blocked: true,
53+
workflow,
54+
logLevel,
55+
ok: false,
56+
};
57+
}
58+
}
3959
for await (const entry of ctx.db
4060
.query("steps")
4161
.withIndex("workflow", (q) => q.eq("workflowId", workflowId))) {
4262
journalEntries.push(entry);
43-
sizeSoFar += journalEntrySize(entry);
44-
if (sizeSoFar > 4 * 1024 * 1024) {
45-
return { journalEntries, ok: false, workflow, logLevel };
63+
journalSize += journalEntrySize(entry);
64+
if (journalSize > MAX_JOURNAL_SIZE) {
65+
return { journalEntries, workflow, logLevel, ok: false };
4666
}
4767
}
48-
return { journalEntries, ok: true, workflow, logLevel };
68+
return { journalEntries, workflow, logLevel, ok: true };
4969
},
5070
});
5171

src/shared.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
export const MAX_JOURNAL_SIZE = 8 << 20;
2+
13
export function formatErrorWithStack(error: unknown): string {
24
if (error instanceof Error) {
35
return error.toString() + (error.stack ? "\n" + error.stack : "");

0 commit comments

Comments
 (0)