Skip to content

Commit a0f88db

Browse files
committed
Normalize the stream source to an async iterable before passing to the writers
1 parent 1a1ebad commit a0f88db

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
AsyncIterableStream,
33
createAsyncIterableStreamFromAsyncIterable,
4+
ensureAsyncIterable,
45
} from "../streams/asyncIterableStream.js";
56
import {
67
RealtimeAppendStreamOptions,
@@ -30,6 +31,9 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
3031
source: AsyncIterable<T> | ReadableStream<T>,
3132
options?: RealtimeAppendStreamOptions
3233
): Promise<RealtimeStreamInstance<T>> {
34+
// Normalize ReadableStream to AsyncIterable
35+
const asyncIterableSource = ensureAsyncIterable(source);
36+
3337
const runId = getRunIdForOptions(options);
3438

3539
if (!runId) {
@@ -52,7 +56,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
5256
? new MetadataStream({
5357
key,
5458
runId,
55-
source,
59+
source: asyncIterableSource,
5660
baseUrl: this.baseUrl,
5761
headers: this.apiClient.getHeaders(),
5862
signal: options?.signal,
@@ -63,7 +67,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
6367
basin: parsedResponse.basin,
6468
stream: key,
6569
accessToken: parsedResponse.accessToken,
66-
source,
70+
source: asyncIterableSource,
6771
signal: options?.signal,
6872
limiter: (await import("p-limit")).default,
6973
debug: this.debug,

packages/core/src/v3/streams/asyncIterableStream.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,33 @@ export function createAsyncIterableStreamFromAsyncGenerator<T>(
103103
): AsyncIterableStream<T> {
104104
return createAsyncIterableStreamFromAsyncIterable(asyncGenerator, transformer, signal);
105105
}
106+
107+
export function ensureAsyncIterable<T>(
108+
input: AsyncIterable<T> | ReadableStream<T>
109+
): AsyncIterable<T> {
110+
// If it's already an AsyncIterable, return it as-is
111+
if (Symbol.asyncIterator in input) {
112+
return input as AsyncIterable<T>;
113+
}
114+
115+
// Convert ReadableStream to AsyncIterable
116+
const readableStream = input as ReadableStream<T>;
117+
return {
118+
async *[Symbol.asyncIterator]() {
119+
const reader = readableStream.getReader();
120+
try {
121+
while (true) {
122+
const { done, value } = await reader.read();
123+
if (done) {
124+
break;
125+
}
126+
if (value !== undefined) {
127+
yield value;
128+
}
129+
}
130+
} finally {
131+
reader.releaseLock();
132+
}
133+
},
134+
};
135+
}

0 commit comments

Comments
 (0)