Skip to content

Commit 8d2625a

Browse files
committed
Refactor the metadata streams stuff to be better
1 parent a0f88db commit 8d2625a

File tree

6 files changed

+43
-43
lines changed

6 files changed

+43
-43
lines changed

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import {
1010
} from "./types.js";
1111
import { taskContext } from "../task-context-api.js";
1212
import { ApiClient } from "../apiClient/index.js";
13-
import { MetadataStream } from "../runMetadata/metadataStream.js";
14-
import { S2MetadataStream } from "../runMetadata/s2MetadataStream.js";
13+
import { StreamsWriterV1 } from "./streamsWriterV1.js";
14+
import { StreamsWriterV2 } from "./streamsWriterV2.js";
1515

1616
export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
1717
constructor(
@@ -53,7 +53,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
5353

5454
const streamInstance =
5555
parsedResponse.version === "v1"
56-
? new MetadataStream({
56+
? new StreamsWriterV1({
5757
key,
5858
runId,
5959
source: asyncIterableSource,
@@ -63,7 +63,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
6363
version,
6464
target: "self",
6565
})
66-
: new S2MetadataStream({
66+
: new StreamsWriterV2({
6767
basin: parsedResponse.basin,
6868
stream: key,
6969
accessToken: parsedResponse.accessToken,

packages/core/src/v3/runMetadata/metadataStream.ts renamed to packages/core/src/v3/realtimeStreams/streamsWriterV1.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import { request as httpsRequest } from "node:https";
22
import { request as httpRequest } from "node:http";
33
import { URL } from "node:url";
44
import { randomBytes } from "node:crypto";
5-
import type { StreamInstance } from "./types.js";
5+
import { StreamsWriter } from "./types.js";
66

7-
export type MetadataOptions<T> = {
7+
export type StreamsWriterV1Options<T> = {
88
baseUrl: string;
99
runId: string;
1010
key: string;
@@ -23,7 +23,7 @@ interface BufferedChunk<T> {
2323
data: T;
2424
}
2525

26-
export class MetadataStream<T> implements StreamInstance {
26+
export class StreamsWriterV1<T> implements StreamsWriter {
2727
private controller = new AbortController();
2828
private serverStream: ReadableStream<T>;
2929
private consumerStream: ReadableStream<T>;
@@ -42,7 +42,7 @@ export class MetadataStream<T> implements StreamInstance {
4242
private bufferReaderTask: Promise<void> | null = null;
4343
private streamComplete = false;
4444

45-
constructor(private options: MetadataOptions<T>) {
45+
constructor(private options: StreamsWriterV1Options<T>) {
4646
const [serverStream, consumerStream] = this.createTeeStreams();
4747
this.serverStream = serverStream;
4848
this.consumerStream = consumerStream;

packages/core/src/v3/runMetadata/s2MetadataStream.ts renamed to packages/core/src/v3/realtimeStreams/streamsWriterV2.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { S2 } from "@s2-dev/streamstore";
2-
import type { StreamInstance } from "./types.js";
2+
import { StreamsWriter } from "./types.js";
33

44
type LimitFunction = {
55
readonly activeCount: number;
@@ -11,7 +11,7 @@ type LimitFunction = {
1111
): Promise<ReturnType>;
1212
};
1313

14-
export type S2MetadataStreamOptions<T = any> = {
14+
export type StreamsWriterV2Options<T = any> = {
1515
basin: string;
1616
stream: string;
1717
accessToken: string;
@@ -24,7 +24,7 @@ export type S2MetadataStreamOptions<T = any> = {
2424
};
2525

2626
/**
27-
* S2MetadataStream writes metadata stream data directly to S2 (https://s2.dev).
27+
* StreamsWriterV2 writes metadata stream data directly to S2 (https://s2.dev).
2828
*
2929
* Features:
3030
* - Batching: Reads chunks as fast as possible and buffers them
@@ -53,7 +53,7 @@ export type S2MetadataStreamOptions<T = any> = {
5353
* }
5454
* ```
5555
*/
56-
export class S2MetadataStream<T = any> implements StreamInstance {
56+
export class StreamsWriterV2<T = any> implements StreamsWriter {
5757
private s2Client: S2;
5858
private serverStream: ReadableStream<T>;
5959
private consumerStream: ReadableStream<T>;
@@ -76,7 +76,7 @@ export class S2MetadataStream<T = any> implements StreamInstance {
7676
private readonly baseDelayMs = 1000;
7777
private readonly maxDelayMs = 30000;
7878

79-
constructor(private options: S2MetadataStreamOptions<T>) {
79+
constructor(private options: StreamsWriterV2Options<T>) {
8080
this.limiter = options.limiter(1);
8181
this.debug = options.debug ?? false;
8282

packages/core/src/v3/realtimeStreams/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,7 @@ export interface RealtimeStreamInstance<T> {
1919
wait(): Promise<void>;
2020
get stream(): AsyncIterableStream<T>;
2121
}
22+
23+
export interface StreamsWriter {
24+
wait(): Promise<void>;
25+
}

packages/core/src/v3/runMetadata/types.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,3 @@ export interface RunMetadataManager extends RunMetadataUpdater {
2929
get parent(): RunMetadataUpdater;
3030
get root(): RunMetadataUpdater;
3131
}
32-
33-
export interface StreamInstance {
34-
wait(): Promise<void>;
35-
}

packages/core/test/metadataStream.test.ts renamed to packages/core/test/streamsWriterV1.test.ts

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { describe, it, expect, beforeEach, afterEach } from "vitest";
22
import { createServer, Server, IncomingMessage, ServerResponse } from "node:http";
33
import { AddressInfo } from "node:net";
4-
import { MetadataStream } from "../src/v3/runMetadata/metadataStream.js";
4+
import { StreamsWriterV1 } from "../src/v3/realtimeStreams/streamsWriterV1.js";
55

66
type RequestHandler = (req: IncomingMessage, res: ServerResponse) => void;
77

8-
describe("MetadataStream", () => {
8+
describe("StreamsWriterV1", () => {
99
let server: Server;
1010
let baseUrl: string;
1111
let requestHandler: RequestHandler | null = null;
@@ -67,7 +67,7 @@ describe("MetadataStream", () => {
6767
yield { chunk: 2, data: "chunk 2" };
6868
}
6969

70-
const metadataStream = new MetadataStream({
70+
const metadataStream = new StreamsWriterV1({
7171
baseUrl,
7272
runId: "run_123",
7373
key: "test-stream",
@@ -95,7 +95,7 @@ describe("MetadataStream", () => {
9595
yield { chunk: 0 };
9696
}
9797

98-
const metadataStream = new MetadataStream({
98+
const metadataStream = new StreamsWriterV1({
9999
baseUrl,
100100
runId: "run_123",
101101
key: "test-stream",
@@ -138,7 +138,7 @@ describe("MetadataStream", () => {
138138
yield { chunk: 2 };
139139
}
140140

141-
const metadataStream = new MetadataStream({
141+
const metadataStream = new StreamsWriterV1({
142142
baseUrl,
143143
runId: "run_123",
144144
key: "test-stream",
@@ -187,7 +187,7 @@ describe("MetadataStream", () => {
187187
yield { chunk: 0 };
188188
}
189189

190-
const metadataStream = new MetadataStream({
190+
const metadataStream = new StreamsWriterV1({
191191
baseUrl,
192192
runId: "run_123",
193193
key: "test-stream",
@@ -214,7 +214,7 @@ describe("MetadataStream", () => {
214214

215215
if (requestCount === 1) {
216216
// First request - don't respond, let it timeout
217-
// (timeout is set to 15 minutes in MetadataStream, so we can't actually test this easily)
217+
// (timeout is set to 15 minutes in StreamsWriterV1, so we can't actually test this easily)
218218
// Instead we'll just delay and then respond
219219
setTimeout(() => {
220220
res.writeHead(200);
@@ -231,7 +231,7 @@ describe("MetadataStream", () => {
231231
yield { chunk: 0 };
232232
}
233233

234-
const metadataStream = new MetadataStream({
234+
const metadataStream = new StreamsWriterV1({
235235
baseUrl,
236236
runId: "run_123",
237237
key: "test-stream",
@@ -274,7 +274,7 @@ describe("MetadataStream", () => {
274274
}
275275
}
276276

277-
const metadataStream = new MetadataStream({
277+
const metadataStream = new StreamsWriterV1({
278278
baseUrl,
279279
runId: "run_123",
280280
key: "test-stream",
@@ -319,7 +319,7 @@ describe("MetadataStream", () => {
319319
yield { chunk: 0 };
320320
}
321321

322-
const metadataStream = new MetadataStream({
322+
const metadataStream = new StreamsWriterV1({
323323
baseUrl,
324324
runId: "run_123",
325325
key: "test-stream",
@@ -366,7 +366,7 @@ describe("MetadataStream", () => {
366366
yield { chunk: 1 };
367367
}
368368

369-
const metadataStream = new MetadataStream({
369+
const metadataStream = new StreamsWriterV1({
370370
baseUrl,
371371
runId: "run_123",
372372
key: "test-stream",
@@ -414,7 +414,7 @@ describe("MetadataStream", () => {
414414
yield { chunk: 0 };
415415
}
416416

417-
const metadataStream = new MetadataStream({
417+
const metadataStream = new StreamsWriterV1({
418418
baseUrl,
419419
runId: "run_123",
420420
key: "test-stream",
@@ -455,7 +455,7 @@ describe("MetadataStream", () => {
455455
yield { chunk: 0 };
456456
}
457457

458-
const metadataStream = new MetadataStream({
458+
const metadataStream = new StreamsWriterV1({
459459
baseUrl,
460460
runId: "run_123",
461461
key: "test-stream",
@@ -478,7 +478,7 @@ describe("MetadataStream", () => {
478478
}
479479
}
480480

481-
const metadataStream = new MetadataStream({
481+
const metadataStream = new StreamsWriterV1({
482482
baseUrl,
483483
runId: "run_123",
484484
key: "test-stream",
@@ -528,7 +528,7 @@ describe("MetadataStream", () => {
528528
}
529529
}
530530

531-
const metadataStream = new MetadataStream({
531+
const metadataStream = new StreamsWriterV1({
532532
baseUrl,
533533
runId: "run_123",
534534
key: "test-stream",
@@ -587,7 +587,7 @@ describe("MetadataStream", () => {
587587
yield { chunk: 0 };
588588
}
589589

590-
const metadataStream = new MetadataStream({
590+
const metadataStream = new StreamsWriterV1({
591591
baseUrl,
592592
runId: "run_123",
593593
key: "test-stream",
@@ -637,7 +637,7 @@ describe("MetadataStream", () => {
637637
}
638638
}
639639

640-
const metadataStream = new MetadataStream({
640+
const metadataStream = new StreamsWriterV1({
641641
baseUrl,
642642
runId: "run_123",
643643
key: "test-stream",
@@ -659,7 +659,7 @@ describe("MetadataStream", () => {
659659
yield { chunk: 2, data: "data 2" };
660660
}
661661

662-
const metadataStream = new MetadataStream({
662+
const metadataStream = new StreamsWriterV1({
663663
baseUrl,
664664
runId: "run_123",
665665
key: "test-stream",
@@ -697,7 +697,7 @@ describe("MetadataStream", () => {
697697
yield { chunk: 0 };
698698
}
699699

700-
const metadataStream = new MetadataStream({
700+
const metadataStream = new StreamsWriterV1({
701701
baseUrl,
702702
runId: "run_123",
703703
key: "test-stream",
@@ -739,7 +739,7 @@ describe("MetadataStream", () => {
739739
yield { chunk: 0 };
740740
}
741741

742-
const metadataStream = new MetadataStream({
742+
const metadataStream = new StreamsWriterV1({
743743
baseUrl,
744744
runId: "run_123",
745745
key: "test-stream",
@@ -770,7 +770,7 @@ describe("MetadataStream", () => {
770770
yield { chunk: 1 };
771771
}
772772

773-
const metadataStream = new MetadataStream({
773+
const metadataStream = new StreamsWriterV1({
774774
baseUrl,
775775
runId: "run_123",
776776
key: "test-stream",
@@ -794,7 +794,7 @@ describe("MetadataStream", () => {
794794
return;
795795
}
796796

797-
const metadataStream = new MetadataStream({
797+
const metadataStream = new StreamsWriterV1({
798798
baseUrl,
799799
runId: "run_123",
800800
key: "test-stream",
@@ -820,7 +820,7 @@ describe("MetadataStream", () => {
820820
// Note: Throwing here would test error handling, but causes test infrastructure issues
821821
}
822822

823-
const metadataStream = new MetadataStream({
823+
const metadataStream = new StreamsWriterV1({
824824
baseUrl,
825825
runId: "run_123",
826826
key: "test-stream",
@@ -861,7 +861,7 @@ describe("MetadataStream", () => {
861861
yield { chunk: 1 };
862862
}
863863

864-
const metadataStream = new MetadataStream({
864+
const metadataStream = new StreamsWriterV1({
865865
baseUrl,
866866
runId: "run_123",
867867
key: "test-stream",
@@ -911,7 +911,7 @@ describe("MetadataStream", () => {
911911
yield { chunk: 0 };
912912
}
913913

914-
const metadataStream = new MetadataStream({
914+
const metadataStream = new StreamsWriterV1({
915915
baseUrl,
916916
runId: "run_123",
917917
key: "test-stream",
@@ -957,7 +957,7 @@ describe("MetadataStream", () => {
957957
}
958958
}
959959

960-
const metadataStream = new MetadataStream({
960+
const metadataStream = new StreamsWriterV1({
961961
baseUrl,
962962
runId: "run_123",
963963
key: "test-stream",

0 commit comments

Comments
 (0)