Skip to content

Commit 7cb27f2

Browse files
authored
feat(google): support streaming (#114)
1 parent be5aa49 commit 7cb27f2

29 files changed

+1313
-329
lines changed

deploy/test/index.spec.ts.snap

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ exports[`deploy > should call openai via gateway > llm 1`] = `
1414
},
1515
},
1616
],
17-
"created": 1761828474,
18-
"id": "chatcmpl-CWMMElxV7Z5jV4zs2g2cRQjZTsY8M",
17+
"created": 1762272055,
18+
"id": "chatcmpl-CYDklwaN7x9okuWTnABMCrZykoiRj",
1919
"model": "gpt-5-2025-08-07",
2020
"object": "chat.completion",
2121
"service_tier": "default",
@@ -118,7 +118,7 @@ exports[`deploy > should call openai via gateway > span 1`] = `
118118
{
119119
"key": "gen_ai.response.id",
120120
"value": {
121-
"stringValue": "chatcmpl-CWMMElxV7Z5jV4zs2g2cRQjZTsY8M",
121+
"stringValue": "chatcmpl-CYDklwaN7x9okuWTnABMCrZykoiRj",
122122
},
123123
},
124124
{
@@ -293,7 +293,7 @@ exports[`deploy > should call openai via gateway > span 1`] = `
293293
{
294294
"key": "http.response.body.text",
295295
"value": {
296-
"stringValue": "{"id":"chatcmpl-CWMMElxV7Z5jV4zs2g2cRQjZTsY8M","object":"chat.completion","created":1761828474,"model":"gpt-5-2025-08-07","choices":[{"index":0,"message":{"role":"assistant","content":"Paris.","refusal":null,"annotations":[]},"finish_reason":"stop"}],"usage":{"prompt_tokens":23,"completion_tokens":75,"total_tokens":98,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":64,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0},"pydantic_ai_gateway":{"cost_estimate":0.00077875}},"service_tier":"default","system_fingerprint":null}",
296+
"stringValue": "{"id":"chatcmpl-CYDklwaN7x9okuWTnABMCrZykoiRj","object":"chat.completion","created":1762272055,"model":"gpt-5-2025-08-07","choices":[{"index":0,"message":{"role":"assistant","content":"Paris.","refusal":null,"annotations":[]},"finish_reason":"stop"}],"usage":{"prompt_tokens":23,"completion_tokens":75,"total_tokens":98,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":64,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0},"pydantic_ai_gateway":{"cost_estimate":0.00077875}},"service_tier":"default","system_fingerprint":null}",
297297
},
298298
},
299299
{

gateway/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"@opentelemetry/resources": "^2.0.1",
1414
"@pydantic/genai-prices": "^0.0.35",
1515
"@pydantic/logfire-api": "^0.9.0",
16+
"@streamparser/json-whatwg": "^0.0.22",
1617
"eventsource-parser": "^3.0.6",
1718
"mime-types": "^3.0.1",
1819
"ts-pattern": "^5.8.0"

gateway/src/api/google.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ import type {
1919
TextPart,
2020
} from '../otel/genai'
2121
import { isMapping, type JsonData } from '../providers/default'
22-
import { BaseAPI } from './base'
22+
import { BaseAPI, type ExtractedRequest, type ExtractedResponse, type ExtractorConfig } from './base'
2323

2424
export { GenerateContentResponse } from '@google/genai'
2525

26-
export class GoogleAPI extends BaseAPI<GoogleRequest, GenerateContentResponse> {
26+
export class GoogleAPI extends BaseAPI<GoogleRequest, GenerateContentResponse, GenerateContentResponse> {
2727
requestStopSequences = (_request: GoogleRequest): string[] | undefined => {
2828
return _request.generationConfig?.stopSequences ?? undefined
2929
}
@@ -67,6 +67,28 @@ export class GoogleAPI extends BaseAPI<GoogleRequest, GenerateContentResponse> {
6767
systemInstructions = (_request: GoogleRequest): TextPart[] | undefined => {
6868
return systemInstructions(_request.systemInstruction)
6969
}
70+
71+
// SafeExtractor implementation
72+
73+
requestExtractors: ExtractorConfig<GoogleRequest, ExtractedRequest> = {
74+
requestModel: (_request: GoogleRequest) => {
75+
this.extractedRequest.requestModel = this.requestModel
76+
},
77+
}
78+
79+
chunkExtractors: ExtractorConfig<GenerateContentResponse, ExtractedResponse> = {
80+
usage: (chunk: GenerateContentResponse) => {
81+
if (chunk.usageMetadata) {
82+
// TODO(Marcelo): This is likely to be wrong, since we are not summing the usage.
83+
this.extractedResponse.usage = this.extractUsage(chunk)
84+
}
85+
},
86+
responseModel: (chunk: GenerateContentResponse) => {
87+
if (chunk.modelVersion) {
88+
this.extractedResponse.responseModel = chunk.modelVersion
89+
}
90+
},
91+
}
7092
}
7193

7294
function mapContent(content: Content): ChatMessage {

gateway/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ export async function gatewayFetch(
4545
ctx: ExecutionContext,
4646
options: GatewayOptions,
4747
): Promise<Response> {
48-
let { pathname: proxyPath } = url
48+
let { pathname: proxyPath, search: queryString } = url
4949
if (options.proxyPrefixLength) {
5050
proxyPath = proxyPath.slice(options.proxyPrefixLength)
5151
}
5252
try {
5353
if (proxyPath === '/') {
5454
return index(request, options)
5555
} else {
56-
return await gateway(request, proxyPath, ctx, options)
56+
return await gateway(request, `${proxyPath}${queryString}`, ctx, options)
5757
}
5858
} catch (error) {
5959
if (error instanceof ResponseError) {

gateway/src/providers/default.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,7 @@ export class DefaultProviderProxy {
321321
}
322322
}
323323

324-
const isStreaming =
325-
responseHeaders.get('content-type')?.startsWith('text/event-stream') ||
326-
('stream' in requestBodyData && requestBodyData.stream === true)
324+
const isStreaming = this.isStreaming(responseHeaders, requestBodyData)
327325
if (isStreaming) {
328326
return this.dispatchStreaming(prepResult, response, responseHeaders)
329327
}
@@ -465,6 +463,13 @@ export class DefaultProviderProxy {
465463
}
466464
}
467465

466+
protected isStreaming(responseHeaders: Headers, requestBodyData: JsonData): boolean {
467+
return (
468+
responseHeaders.get('content-type')?.toLowerCase().startsWith('text/event-stream') ||
469+
('stream' in requestBodyData && requestBodyData.stream === true)
470+
)
471+
}
472+
468473
protected isWhitelistedEndpoint(): boolean {
469474
return false
470475
}

gateway/src/providers/google/auth.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ResponseError } from '../../utils'
22

3-
export async function authToken(credentials: string, kv: KVNamespace): Promise<string> {
3+
export async function authToken(credentials: string, kv: KVNamespace, subFetch: typeof fetch): Promise<string> {
44
const serviceAccountHash = await hash(credentials)
55
const cacheKey = `gcp-auth:${serviceAccountHash}`
66
const cachedToken = await kv.get(cacheKey, { cacheTtl: 300 })
@@ -9,7 +9,7 @@ export async function authToken(credentials: string, kv: KVNamespace): Promise<s
99
}
1010
const serviceAccount = getServiceAccount(credentials)
1111
const jwt = await jwtSign(serviceAccount)
12-
const token = await getAccessToken(jwt)
12+
const token = await getAccessToken(jwt, subFetch)
1313
await kv.put(cacheKey, token, { expirationTtl: 3000 })
1414
return token
1515
}
@@ -80,10 +80,10 @@ async function jwtSign(serviceAccount: ServiceAccount): Promise<string> {
8080
return `${signingInput}.${b64UrlEncodeArray(signature)}`
8181
}
8282

83-
async function getAccessToken(jwt: string): Promise<string> {
83+
async function getAccessToken(jwt: string, subFetch: typeof fetch): Promise<string> {
8484
const body = new URLSearchParams({ grant_type: 'urn:ietf:params:oauth:grant-type:jwt-bearer', assertion: jwt })
8585

86-
const response = await fetch(tokenUrl, {
86+
const response = await subFetch(tokenUrl, {
8787
method: 'POST',
8888
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
8989
signal: AbortSignal.timeout(10000),

gateway/src/providers/google/index.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ export class GoogleVertexProvider extends DefaultProviderProxy {
1919
if (!path) {
2020
return { error: 'Unable to parse path' }
2121
}
22-
return `${this.providerProxy.baseUrl}${path}`
22+
return `${stripTrailingSlash(this.providerProxy.baseUrl)}/${stripLeadingSlash(path)}`
2323
} else {
2424
return { error: 'baseUrl is required for the Google Provider' }
2525
}
@@ -72,7 +72,8 @@ export class GoogleVertexProvider extends DefaultProviderProxy {
7272
this.flavor = 'anthropic'
7373
}
7474

75-
return `/${version}/projects/${projectId}/locations/${region}/publishers/${publisher}/models/${modelAndApi}`
75+
const path = `/${version}/projects/${projectId}/locations/${region}/publishers/${publisher}/models/${modelAndApi}`
76+
return path
7677
}
7778

7879
async prepRequest() {
@@ -92,7 +93,7 @@ export class GoogleVertexProvider extends DefaultProviderProxy {
9293
}
9394

9495
async requestHeaders(headers: Headers): Promise<void> {
95-
const token = await authToken(this.providerProxy.credentials, this.options.kv)
96+
const token = await authToken(this.providerProxy.credentials, this.options.kv, this.options.subFetch)
9697
headers.set('Authorization', `Bearer ${token}`)
9798
}
9899
}
@@ -104,10 +105,9 @@ export class GoogleVertexProvider extends DefaultProviderProxy {
104105
* @param url - The URL to extract the region from e.g. https://europe-west4-aiplatform.googleapis.com or https://aiplatform.googleapis.com.
105106
*/
106107
function regionFromUrl(url: string): null | string {
107-
if (url.includes('https://aiplatform.googleapis.com')) {
108-
return 'global'
109-
}
110-
// The group includes regions with hyphen like "europe-west4"
111-
const match = url.match(/^https:\/\/(.+?)-aiplatform\.googleapis\.com$/)
112-
return match?.[1] ?? null
108+
const match = url.match(/^https:\/\/([^-]+)-aiplatform\.googleapis\.com$/)
109+
return match?.[1] ?? 'global'
113110
}
111+
112+
const stripTrailingSlash = (url: string): string => (url.endsWith('/') ? url.slice(0, -1) : url)
113+
const stripLeadingSlash = (url: string): string => (url.startsWith('/') ? url.slice(1) : url)

gateway/test/env.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ interface Env {
66
GROQ_API_KEY: string
77
ANTHROPIC_API_KEY: string
88
AWS_BEARER_TOKEN_BEDROCK: string
9+
GOOGLE_SERVICE_ACCOUNT_KEY: string
910
}
1011

1112
declare module 'cloudflare:test' {

gateway/test/gateway.spec.ts.snap

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ exports[`custom proxyPrefixLength > inference > proxyPrefixLength 1`] = `
1414
},
1515
},
1616
],
17-
"created": 1761823178,
18-
"id": "chatcmpl-CWKyoLFrrxfDdUZO6hAaDA7rYn3Fo",
17+
"created": 1762271642,
18+
"id": "chatcmpl-CYDe6BCWOKGGGTlQLofyQ2DP3QTRV",
1919
"model": "gpt-5-2025-08-07",
2020
"object": "chat.completion",
2121
"service_tier": "default",

gateway/test/providers/anthropic.spec.ts.snap

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1130,7 +1130,7 @@ exports[`anthropic > should call anthropic via gateway with stream > span 1`] =
11301130
{
11311131
"key": "logfire.json_schema",
11321132
"value": {
1133-
"stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.cache_write_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"},"http.request.method":{"type":"string"},"url.full":{"type":"string"},"http.request.header.accept":{"type":"string"},"http.request.header.anthropic-version":{"type":"string"},"http.request.header.authorization":{"type":"string"},"http.request.header.content-type":{"type":"string"},"http.request.header.user-agent":{"type":"string"},"http.request.header.x-stainless-arch":{"type":"string"},"http.request.header.x-stainless-lang":{"type":"string"},"http.request.header.x-stainless-os":{"type":"string"},"http.request.header.x-stainless-package-version":{"type":"string"},"http.request.header.x-stainless-retry-count":{"type":"string"},"http.request.header.x-stainless-runtime":{"type":"string"},"http.request.header.x-stainless-runtime-version":{"type":"string"},"http.request.header.x-stainless-timeout":{"type":"string"},"http.response.status_code":{"type":"number"},"http.response.header.server":{"type":"string"},"http.response.header.transfer-encoding":{"type":"string"}}}",
1133+
"stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.cache_write_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"},"http.request.method":{"type":"string"},"url.full":{"type":"string"},"http.request.header.accept":{"type":"string"},"http.request.header.anthropic-version":{"type":"string"},"http.request.header.authorization":{"type":"string"},"http.request.header.content-type":{"type":"string"},"http.request.header.user-agent":{"type":"string"},"http.request.header.x-stainless-arch":{"type":"string"},"http.request.header.x-stainless-lang":{"type":"string"},"http.request.header.x-stainless-os":{"type":"string"},"http.request.header.x-stainless-package-version":{"type":"string"},"http.request.header.x-stainless-retry-count":{"type":"string"},"http.request.header.x-stainless-runtime":{"type":"string"},"http.request.header.x-stainless-runtime-version":{"type":"string"},"http.request.header.x-stainless-timeout":{"type":"string"},"http.response.status_code":{"type":"number"},"http.response.header.content-type":{"type":"string"},"http.response.header.server":{"type":"string"},"http.response.header.transfer-encoding":{"type":"string"}}}",
11341134
},
11351135
},
11361136
{
@@ -1295,6 +1295,12 @@ exports[`anthropic > should call anthropic via gateway with stream > span 1`] =
12951295
"intValue": 200,
12961296
},
12971297
},
1298+
{
1299+
"key": "http.response.header.content-type",
1300+
"value": {
1301+
"stringValue": "text/event-stream; charset=utf-8",
1302+
},
1303+
},
12981304
{
12991305
"key": "http.response.header.server",
13001306
"value": {

0 commit comments

Comments
 (0)