Skip to content

Commit fd246c4

Browse files
committed
New streams API (sync pipe with default)
1 parent 667bf8d commit fd246c4

File tree

12 files changed

+457
-82
lines changed

12 files changed

+457
-82
lines changed

packages/core/src/v3/realtimeStreams/index.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getGlobal, registerGlobal } from "../utils/globals.js";
22
import { NoopRealtimeStreamsManager } from "./noopManager.js";
33
import {
4-
RealtimeAppendStreamOptions,
4+
RealtimePipeStreamOptions,
55
RealtimeStreamInstance,
66
RealtimeStreamsManager,
77
} from "./types.js";
@@ -31,11 +31,11 @@ export class RealtimeStreamsAPI implements RealtimeStreamsManager {
3131
return getGlobal(API_NAME) ?? NOOP_MANAGER;
3232
}
3333

34-
public append<T>(
34+
public pipe<T>(
3535
key: string,
3636
source: AsyncIterable<T> | ReadableStream<T>,
37-
options?: RealtimeAppendStreamOptions
38-
): Promise<RealtimeStreamInstance<T>> {
39-
return this.#getManager().append(key, source, options);
37+
options?: RealtimePipeStreamOptions
38+
): RealtimeStreamInstance<T> {
39+
return this.#getManager().pipe(key, source, options);
4040
}
4141
}

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import {
44
ensureAsyncIterable,
55
} from "../streams/asyncIterableStream.js";
66
import {
7-
RealtimeAppendStreamOptions,
7+
RealtimePipeStreamOptions,
88
RealtimeStreamInstance,
99
RealtimeStreamsManager,
1010
} from "./types.js";
1111
import { taskContext } from "../task-context-api.js";
1212
import { ApiClient } from "../apiClient/index.js";
1313
import { StreamsWriterV1 } from "./streamsWriterV1.js";
1414
import { StreamsWriterV2 } from "./streamsWriterV2.js";
15+
import { StreamInstance } from "./streamInstance.js";
1516

1617
export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
1718
constructor(
@@ -29,11 +30,11 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
2930
this.activeStreams.clear();
3031
}
3132

32-
public async append<T>(
33+
public pipe<T>(
3334
key: string,
3435
source: AsyncIterable<T> | ReadableStream<T>,
35-
options?: RealtimeAppendStreamOptions
36-
): Promise<RealtimeStreamInstance<T>> {
36+
options?: RealtimePipeStreamOptions
37+
): RealtimeStreamInstance<T> {
3738
// Normalize ReadableStream to AsyncIterable
3839
const asyncIterableSource = ensureAsyncIterable(source);
3940

@@ -45,45 +46,25 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
4546
);
4647
}
4748

48-
const { version, headers } = await this.apiClient.createStream(
49-
runId,
50-
"self",
51-
key,
52-
options?.requestOptions
53-
);
54-
55-
const parsedResponse = parseCreateStreamResponse(version, headers);
56-
5749
// Create an AbortController for this stream
5850
const abortController = new AbortController();
5951
// Chain with user-provided signal if present
6052
const combinedSignal = options?.signal
6153
? AbortSignal.any?.([options.signal, abortController.signal]) ?? abortController.signal
6254
: abortController.signal;
6355

64-
const streamInstance =
65-
parsedResponse.version === "v1"
66-
? new StreamsWriterV1({
67-
key,
68-
runId,
69-
source: asyncIterableSource,
70-
baseUrl: this.baseUrl,
71-
headers: this.apiClient.getHeaders(),
72-
signal: combinedSignal,
73-
version,
74-
target: "self",
75-
})
76-
: new StreamsWriterV2({
77-
basin: parsedResponse.basin,
78-
stream: key,
79-
accessToken: parsedResponse.accessToken,
80-
source: asyncIterableSource,
81-
signal: combinedSignal,
82-
limiter: (await import("p-limit")).default,
83-
debug: this.debug,
84-
flushIntervalMs: parsedResponse.flushIntervalMs,
85-
maxRetries: parsedResponse.maxRetries,
86-
});
56+
const streamInstance = new StreamInstance({
57+
apiClient: this.apiClient,
58+
baseUrl: this.baseUrl,
59+
runId,
60+
key,
61+
source: asyncIterableSource,
62+
headers: this.apiClient.getHeaders(),
63+
signal: combinedSignal,
64+
requestOptions: options?.requestOptions,
65+
target: options?.target,
66+
debug: this.debug,
67+
});
8768

8869
// Register this stream
8970
const streamInfo = { wait: () => streamInstance.wait(), abortController };
@@ -94,9 +75,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
9475

9576
return {
9677
wait: () => streamInstance.wait(),
97-
get stream(): AsyncIterableStream<T> {
98-
return createAsyncIterableStreamFromAsyncIterable(streamInstance);
99-
},
78+
stream: streamInstance.stream,
10079
};
10180
}
10281

@@ -140,7 +119,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
140119
}
141120
}
142121

143-
function getRunIdForOptions(options?: RealtimeAppendStreamOptions): string | undefined {
122+
function getRunIdForOptions(options?: RealtimePipeStreamOptions): string | undefined {
144123
if (options?.target) {
145124
if (options.target === "parent") {
146125
return taskContext.ctx?.run?.parentTaskRunId;

packages/core/src/v3/realtimeStreams/noopManager.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,22 @@ import {
33
createAsyncIterableStreamFromAsyncIterable,
44
} from "../streams/asyncIterableStream.js";
55
import {
6-
RealtimeAppendStreamOptions,
6+
RealtimePipeStreamOptions,
77
RealtimeStreamInstance,
88
RealtimeStreamsManager,
99
} from "./types.js";
1010

1111
export class NoopRealtimeStreamsManager implements RealtimeStreamsManager {
12-
public append<T>(
12+
public pipe<T>(
1313
key: string,
1414
source: AsyncIterable<T> | ReadableStream<T>,
15-
options?: RealtimeAppendStreamOptions
16-
): Promise<RealtimeStreamInstance<T>> {
17-
return Promise.resolve({
15+
options?: RealtimePipeStreamOptions
16+
): RealtimeStreamInstance<T> {
17+
return {
1818
wait: () => Promise.resolve(),
1919
get stream(): AsyncIterableStream<T> {
2020
return createAsyncIterableStreamFromAsyncIterable(source);
2121
},
22-
});
22+
};
2323
}
2424
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import { ApiClient } from "../apiClient/index.js";
2+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
3+
import { AnyZodFetchOptions } from "../zodfetch.js";
4+
import { StreamsWriterV1 } from "./streamsWriterV1.js";
5+
import { StreamsWriterV2 } from "./streamsWriterV2.js";
6+
import { StreamsWriter } from "./types.js";
7+
8+
export type StreamInstanceOptions<T> = {
9+
apiClient: ApiClient;
10+
baseUrl: string;
11+
runId: string;
12+
key: string;
13+
source: AsyncIterable<T>;
14+
headers?: Record<string, string>;
15+
signal?: AbortSignal;
16+
requestOptions?: AnyZodFetchOptions;
17+
target?: "self" | "parent" | "root" | string;
18+
debug?: boolean;
19+
};
20+
21+
type StreamsWriterInstance<T> = StreamsWriterV1<T> | StreamsWriterV2<T>;
22+
23+
export class StreamInstance<T> implements StreamsWriter {
24+
private streamPromise: Promise<StreamsWriterInstance<T>>;
25+
26+
constructor(private options: StreamInstanceOptions<T>) {
27+
this.streamPromise = this.initializeWriter();
28+
}
29+
30+
private async initializeWriter(): Promise<StreamsWriterInstance<T>> {
31+
const { version, headers } = await this.options.apiClient.createStream(
32+
this.options.runId,
33+
"self",
34+
this.options.key,
35+
this.options?.requestOptions
36+
);
37+
38+
const parsedResponse = parseCreateStreamResponse(version, headers);
39+
40+
const streamWriter =
41+
parsedResponse.version === "v1"
42+
? new StreamsWriterV1({
43+
key: this.options.key,
44+
runId: this.options.runId,
45+
source: this.options.source,
46+
baseUrl: this.options.baseUrl,
47+
headers: this.options.apiClient.getHeaders(),
48+
signal: this.options.signal,
49+
version,
50+
target: "self",
51+
})
52+
: new StreamsWriterV2({
53+
basin: parsedResponse.basin,
54+
stream: this.options.key,
55+
accessToken: parsedResponse.accessToken,
56+
source: this.options.source,
57+
signal: this.options.signal,
58+
limiter: (await import("p-limit")).default,
59+
debug: this.options.debug,
60+
flushIntervalMs: parsedResponse.flushIntervalMs,
61+
maxRetries: parsedResponse.maxRetries,
62+
});
63+
64+
return streamWriter;
65+
}
66+
67+
public async wait(): Promise<void> {
68+
return this.streamPromise.then((writer) => writer.wait());
69+
}
70+
71+
public get stream(): AsyncIterableStream<T> {
72+
const self = this;
73+
74+
return new ReadableStream<T>({
75+
async start(controller) {
76+
const streamWriter = await self.streamPromise;
77+
78+
const iterator = streamWriter[Symbol.asyncIterator]();
79+
80+
while (true) {
81+
if (self.options.signal?.aborted) {
82+
break;
83+
}
84+
85+
const { done, value } = await iterator.next();
86+
87+
if (done) {
88+
controller.close();
89+
break;
90+
}
91+
92+
controller.enqueue(value);
93+
}
94+
},
95+
});
96+
}
97+
}
98+
99+
type ParsedStreamResponse =
100+
| {
101+
version: "v1";
102+
}
103+
| {
104+
version: "v2";
105+
accessToken: string;
106+
basin: string;
107+
flushIntervalMs?: number;
108+
maxRetries?: number;
109+
};
110+
111+
function parseCreateStreamResponse(
112+
version: string,
113+
headers: Record<string, string> | undefined
114+
): ParsedStreamResponse {
115+
if (version === "v1") {
116+
return { version: "v1" };
117+
}
118+
119+
const accessToken = headers?.["x-s2-access-token"];
120+
const basin = headers?.["x-s2-basin"];
121+
122+
if (!accessToken || !basin) {
123+
return { version: "v1" };
124+
}
125+
126+
const flushIntervalMs = headers?.["x-s2-flush-interval-ms"];
127+
const maxRetries = headers?.["x-s2-max-retries"];
128+
129+
return {
130+
version: "v2",
131+
accessToken,
132+
basin,
133+
flushIntervalMs: flushIntervalMs ? parseInt(flushIntervalMs) : undefined,
134+
maxRetries: maxRetries ? parseInt(maxRetries) : undefined,
135+
};
136+
}
137+
138+
async function* streamToAsyncIterator<T>(stream: ReadableStream<T>): AsyncIterableIterator<T> {
139+
const reader = stream.getReader();
140+
try {
141+
while (true) {
142+
const { done, value } = await reader.read();
143+
if (done) return;
144+
yield value;
145+
}
146+
} finally {
147+
safeReleaseLock(reader);
148+
}
149+
}
150+
151+
function safeReleaseLock(reader: ReadableStreamDefaultReader<any>) {
152+
try {
153+
reader.releaseLock();
154+
} catch (error) {}
155+
}

packages/core/src/v3/realtimeStreams/types.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
import { AnyZodFetchOptions } from "../apiClient/core.js";
22
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
33

4-
export type RealtimeAppendStreamOptions = {
4+
export type RealtimePipeStreamOptions = {
55
signal?: AbortSignal;
66
target?: string;
77
requestOptions?: AnyZodFetchOptions;
88
};
99

1010
export interface RealtimeStreamsManager {
11-
append<T>(
11+
pipe<T>(
1212
key: string,
1313
source: AsyncIterable<T> | ReadableStream<T>,
14-
options?: RealtimeAppendStreamOptions
15-
): Promise<RealtimeStreamInstance<T>>;
14+
options?: RealtimePipeStreamOptions
15+
): RealtimeStreamInstance<T>;
1616
}
1717

1818
export interface RealtimeStreamInstance<T> {

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ export class StandardMetadataManager implements RunMetadataManager {
319319
return $value;
320320
}
321321

322-
const streamInstance = await realtimeStreams.append(key, value, {
322+
const streamInstance = await realtimeStreams.pipe(key, value, {
323323
signal,
324324
target,
325325
});

packages/trigger-sdk/src/v3/metadata.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,14 @@ async function refreshMetadata(requestOptions?: ApiRequestOptions): Promise<void
230230
}
231231

232232
/**
233-
* @deprecated Use `streams.append()` instead.
233+
* @deprecated Use `streams.pipe()` instead.
234234
*/
235235
async function stream<T>(
236236
key: string,
237237
value: AsyncIterable<T> | ReadableStream<T>,
238238
signal?: AbortSignal
239239
): Promise<AsyncIterable<T>> {
240-
const streamInstance = await streams.append(key, value, {
240+
const streamInstance = await streams.pipe(key, value, {
241241
signal,
242242
});
243243

0 commit comments

Comments
 (0)