Skip to content

Commit 41ba9bc

Browse files
feat: improve responses API implementation with PR ericc-ch#119 enhancements
Add critical keep-alive streaming support: - Implement startStreamPing utility with 3s interval pings - Prevents client timeouts on long reasoning operations - Fixes Claude Code retry issues with stream=false fallback Improve error handling and robustness: - Add try/catch/finally blocks in all stream handlers - Proper cleanup of ping intervals on stream completion - Better error propagation and logging Add signature field support: - Update AnthropicThinkingBlock type with optional signature field - Support for signature_delta in content block delta events Add comprehensive test coverage: - Create tests/responses-translation.test.ts (258 lines) - Create tests/responses-stream-translation.test.ts (250 lines) - Test non-streaming responses with reasoning and tool calls - Test streaming events including errors and failures - All 36 tests passing Update ESLint config: - Disable max-lines-per-function for test files This brings responses API implementation to production-ready quality based on best practices from PR ericc-ch#119, while maintaining bypass credit support and existing functionality. Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
1 parent 4697d6c commit 41ba9bc

File tree

7 files changed

+644
-65
lines changed

7 files changed

+644
-65
lines changed

eslint.config.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
import config from "@echristian/eslint-config"
22

3-
export default config({
4-
prettier: {
5-
plugins: ["prettier-plugin-packagejson"],
3+
export default config(
4+
{
5+
prettier: {
6+
plugins: ["prettier-plugin-packagejson"],
7+
},
68
},
7-
})
9+
{
10+
files: ["tests/**/*.test.ts"],
11+
rules: {
12+
"max-lines-per-function": "off",
13+
},
14+
},
15+
)

src/lib/utils.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import type { SSEStreamingApi } from "hono/streaming"
2+
13
import consola from "consola"
24

35
import { getModels } from "~/services/copilot/get-models"
@@ -24,3 +26,33 @@ export const cacheVSCodeVersion = async () => {
2426

2527
consola.info(`Using VSCode version: ${response}`)
2628
}
29+
30+
/**
31+
* Starts a periodic ping for SSE streams to maintain client connections.
32+
* Critical for long-running operations like reasoning where the model may not
33+
* send data for extended periods (10s+). Without pings, clients like Claude Code
34+
* will timeout and retry with stream=false, causing double billing.
35+
*
36+
* @param stream - The SSE stream to send pings to
37+
* @param intervalMs - Interval between pings in milliseconds (default: 3000ms)
38+
* @returns The interval ID for cleanup
39+
*/
40+
export const startStreamPing = (
41+
stream: SSEStreamingApi,
42+
intervalMs: number = 3000,
43+
) => {
44+
const pingInterval = setInterval(async () => {
45+
try {
46+
await stream.writeSSE({
47+
event: "ping",
48+
data: "",
49+
})
50+
consola.debug("Sent ping")
51+
} catch (error) {
52+
consola.warn("Failed to send ping:", error)
53+
clearInterval(pingInterval)
54+
}
55+
}, intervalMs)
56+
57+
return pingInterval
58+
}

src/routes/messages/anthropic-types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ export interface AnthropicToolUseBlock {
5656
export interface AnthropicThinkingBlock {
5757
type: "thinking"
5858
thinking: string
59+
signature?: string | null
5960
}
6061

6162
export type AnthropicUserContentBlock =
@@ -133,7 +134,7 @@ export interface AnthropicContentBlockStartEvent {
133134
| (Omit<AnthropicToolUseBlock, "input"> & {
134135
input: Record<string, unknown>
135136
})
136-
| { type: "thinking"; thinking: string }
137+
| { type: "thinking"; thinking: string; signature?: string | null }
137138
}
138139

139140
export interface AnthropicContentBlockDeltaEvent {

src/routes/messages/handler.ts

Lines changed: 72 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { awaitApproval } from "~/lib/approval"
77
import { checkRateLimit } from "~/lib/rate-limit"
88
import { logRequest } from "~/lib/request-logger"
99
import { state } from "~/lib/state"
10+
import { startStreamPing } from "~/lib/utils"
1011
import {
1112
createResponsesStreamState,
1213
translateResponsesStreamEvent,
@@ -93,33 +94,42 @@ const handleWithChatCompletions = async (
9394

9495
consola.debug("Streaming response from Copilot")
9596
return streamSSE(c, async (stream) => {
97+
const pingInterval = startStreamPing(stream)
9698
const streamState: AnthropicStreamState = {
9799
messageStartSent: false,
98100
contentBlockIndex: 0,
99101
contentBlockOpen: false,
100102
toolCalls: {},
101103
}
102104

103-
for await (const rawEvent of response) {
104-
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
105-
if (rawEvent.data === "[DONE]") {
106-
break
107-
}
105+
try {
106+
for await (const rawEvent of response) {
107+
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
108+
if (rawEvent.data === "[DONE]") {
109+
break
110+
}
108111

109-
if (!rawEvent.data) {
110-
continue
111-
}
112+
if (!rawEvent.data) {
113+
continue
114+
}
112115

113-
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
114-
const events = translateChunkToAnthropicEvents(chunk, streamState)
116+
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
117+
const events = translateChunkToAnthropicEvents(chunk, streamState)
115118

116-
for (const event of events) {
117-
consola.debug("Translated Anthropic event:", JSON.stringify(event))
118-
await stream.writeSSE({
119-
event: event.type,
120-
data: JSON.stringify(event),
121-
})
119+
for (const event of events) {
120+
consola.debug("Translated Anthropic event:", JSON.stringify(event))
121+
await stream.writeSSE({
122+
event: event.type,
123+
data: JSON.stringify(event),
124+
})
125+
}
122126
}
127+
} catch (error) {
128+
consola.error("Error in chat completions stream:", error)
129+
throw error
130+
} finally {
131+
clearInterval(pingInterval)
132+
consola.debug("Chat completions stream completed, ping stopped")
123133
}
124134
})
125135
}
@@ -144,50 +154,59 @@ const handleWithResponsesApi = async (
144154
if (responsesPayload.stream && isAsyncIterable(response)) {
145155
consola.debug("Streaming response from Copilot (Responses API)")
146156
return streamSSE(c, async (stream) => {
157+
const pingInterval = startStreamPing(stream)
147158
const streamState = createResponsesStreamState()
148159

149-
for await (const chunk of response) {
150-
consola.debug("Responses raw stream event:", JSON.stringify(chunk))
151-
152-
const eventName = (chunk as { event?: string }).event
153-
if (eventName === "ping") {
154-
await stream.writeSSE({ event: "ping", data: "" })
155-
continue
156-
}
157-
158-
const data = (chunk as { data?: string }).data
159-
if (!data) {
160-
continue
161-
}
162-
163-
if (data === "[DONE]") {
164-
break
160+
try {
161+
for await (const chunk of response) {
162+
consola.debug("Responses raw stream event:", JSON.stringify(chunk))
163+
164+
const eventName = (chunk as { event?: string }).event
165+
if (eventName === "ping") {
166+
await stream.writeSSE({ event: "ping", data: "" })
167+
continue
168+
}
169+
170+
const data = (chunk as { data?: string }).data
171+
if (!data) {
172+
continue
173+
}
174+
175+
if (data === "[DONE]") {
176+
break
177+
}
178+
179+
const parsed = safeJsonParse(data)
180+
if (!parsed) {
181+
continue
182+
}
183+
184+
const events = translateResponsesStreamEvent(parsed, streamState)
185+
for (const event of events) {
186+
consola.debug("Translated Anthropic event:", JSON.stringify(event))
187+
await stream.writeSSE({
188+
event: event.type,
189+
data: JSON.stringify(event),
190+
})
191+
}
165192
}
166193

167-
const parsed = safeJsonParse(data)
168-
if (!parsed) {
169-
continue
170-
}
171-
172-
const events = translateResponsesStreamEvent(parsed, streamState)
173-
for (const event of events) {
174-
consola.debug("Translated Anthropic event:", JSON.stringify(event))
194+
if (!streamState.messageCompleted) {
195+
consola.warn(
196+
"Responses stream ended without completion; sending fallback message_stop",
197+
)
198+
const fallback = { type: "message_stop" as const }
175199
await stream.writeSSE({
176-
event: event.type,
177-
data: JSON.stringify(event),
200+
event: fallback.type,
201+
data: JSON.stringify(fallback),
178202
})
179203
}
180-
}
181-
182-
if (!streamState.messageCompleted) {
183-
consola.warn(
184-
"Responses stream ended without completion; sending fallback message_stop",
185-
)
186-
const fallback = { type: "message_stop" as const }
187-
await stream.writeSSE({
188-
event: fallback.type,
189-
data: JSON.stringify(fallback),
190-
})
204+
} catch (error) {
205+
consola.error("Error in Responses API stream:", error)
206+
throw error
207+
} finally {
208+
clearInterval(pingInterval)
209+
consola.debug("Responses API stream completed, ping stopped")
191210
}
192211
})
193212
}

src/routes/responses/handler.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { processResponsesInputWithBypass } from "~/lib/bypass-credit"
88
import { checkRateLimit } from "~/lib/rate-limit"
99
import { logRequest } from "~/lib/request-logger"
1010
import { state } from "~/lib/state"
11+
import { startStreamPing } from "~/lib/utils"
1112
import {
1213
createResponses,
1314
type ResponsesPayload,
@@ -74,13 +75,23 @@ export const handleResponses = async (c: Context) => {
7475
if (isStreamingRequested(processedPayload) && isAsyncIterable(response)) {
7576
consola.debug("Forwarding native Responses stream")
7677
return streamSSE(c, async (stream) => {
77-
for await (const chunk of response) {
78-
consola.debug("Responses stream chunk:", JSON.stringify(chunk))
79-
await stream.writeSSE({
80-
id: (chunk as { id?: string }).id,
81-
event: (chunk as { event?: string }).event,
82-
data: (chunk as { data?: string }).data ?? "",
83-
})
78+
const pingInterval = startStreamPing(stream)
79+
80+
try {
81+
for await (const chunk of response) {
82+
consola.debug("Responses stream chunk:", JSON.stringify(chunk))
83+
await stream.writeSSE({
84+
id: (chunk as { id?: string }).id,
85+
event: (chunk as { event?: string }).event,
86+
data: (chunk as { data?: string }).data ?? "",
87+
})
88+
}
89+
} catch (error) {
90+
consola.error("Error in Responses stream:", error)
91+
throw error
92+
} finally {
93+
clearInterval(pingInterval)
94+
consola.debug("Responses stream completed, ping stopped")
8495
}
8596
})
8697
}

0 commit comments

Comments
 (0)