File tree Expand file tree Collapse file tree 2 files changed +36
-2
lines changed Expand file tree Collapse file tree 2 files changed +36
-2
lines changed Original file line number Diff line number Diff line change 11import {
22 AsyncIterableStream ,
33 createAsyncIterableStreamFromAsyncIterable ,
4+ ensureAsyncIterable ,
45} from "../streams/asyncIterableStream.js" ;
56import {
67 RealtimeAppendStreamOptions ,
@@ -30,6 +31,9 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
3031 source : AsyncIterable < T > | ReadableStream < T > ,
3132 options ?: RealtimeAppendStreamOptions
3233 ) : Promise < RealtimeStreamInstance < T > > {
34+ // Normalize ReadableStream to AsyncIterable
35+ const asyncIterableSource = ensureAsyncIterable ( source ) ;
36+
3337 const runId = getRunIdForOptions ( options ) ;
3438
3539 if ( ! runId ) {
@@ -52,7 +56,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
5256 ? new MetadataStream ( {
5357 key,
5458 runId,
55- source,
59+ source : asyncIterableSource ,
5660 baseUrl : this . baseUrl ,
5761 headers : this . apiClient . getHeaders ( ) ,
5862 signal : options ?. signal ,
@@ -63,7 +67,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
6367 basin : parsedResponse . basin ,
6468 stream : key ,
6569 accessToken : parsedResponse . accessToken ,
66- source,
70+ source : asyncIterableSource ,
6771 signal : options ?. signal ,
6872 limiter : ( await import ( "p-limit" ) ) . default ,
6973 debug : this . debug ,
Original file line number Diff line number Diff line change @@ -103,3 +103,33 @@ export function createAsyncIterableStreamFromAsyncGenerator<T>(
103103) : AsyncIterableStream < T > {
104104 return createAsyncIterableStreamFromAsyncIterable ( asyncGenerator , transformer , signal ) ;
105105}
106+
107+ export function ensureAsyncIterable < T > (
108+ input : AsyncIterable < T > | ReadableStream < T >
109+ ) : AsyncIterable < T > {
110+ // If it's already an AsyncIterable, return it as-is
111+ if ( Symbol . asyncIterator in input ) {
112+ return input as AsyncIterable < T > ;
113+ }
114+
115+ // Convert ReadableStream to AsyncIterable
116+ const readableStream = input as ReadableStream < T > ;
117+ return {
118+ async * [ Symbol . asyncIterator ] ( ) {
119+ const reader = readableStream . getReader ( ) ;
120+ try {
121+ while ( true ) {
122+ const { done, value } = await reader . read ( ) ;
123+ if ( done ) {
124+ break ;
125+ }
126+ if ( value !== undefined ) {
127+ yield value ;
128+ }
129+ }
130+ } finally {
131+ reader . releaseLock ( ) ;
132+ }
133+ } ,
134+ } ;
135+ }
You can’t perform that action at this time.
0 commit comments