Skip to content

Commit 6017ac3

Browse files
authored
refactor(otel): simplify otel calls on whitelisted endpoints (#109)
1 parent 2d42187 commit 6017ac3

File tree

7 files changed

+293
-55
lines changed

7 files changed

+293
-55
lines changed

gateway/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"main": "src/index.ts",
66
"scripts": {
77
"typecheck": "tsgo --noEmit && cd test && tsgo --noEmit",
8-
"test": "vitest"
8+
"test": "vitest --reporter=verbose"
99
},
1010
"dependencies": {
1111
"@opentelemetry/api": "^1.9.0",

gateway/src/gateway.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,32 +64,31 @@ export async function gateway(
6464
const result = await proxy.dispatch()
6565

6666
// This doesn't work on streaming because the `result` object is returned as soon as we create the streaming response.
67-
if (!('responseStream' in result) && !('httpStatusCode' in result)) {
67+
if (!('responseStream' in result) && !('response' in result)) {
6868
const [spanName, attributes, level] = genAiOtelAttributes(result, proxy)
6969
dispatchSpan.end(spanName, attributes, { level })
7070
}
7171

7272
let response: Response
73-
if ('httpStatusCode' in result) {
74-
const { httpStatusCode, responseHeaders, responseBody } = result
75-
response = new Response(responseBody, { status: httpStatusCode, headers: responseHeaders })
73+
if ('response' in result) {
74+
response = result.response
7675
} else if ('responseStream' in result) {
77-
const { successStatus: status, responseHeaders: headers, responseStream, disableKey, waitCompletion } = result
76+
const { successStatus: status, responseHeaders: headers, responseStream, disableKey, onStreamComplete } = result
7877
runAfter(
7978
ctx,
8079
'recordSpend',
8180
(async () => {
82-
await waitCompletion
83-
const cost = proxy.cost
84-
if (cost) {
85-
await recordSpend(apiKeyInfo, cost, options)
86-
} else {
81+
const complete = await onStreamComplete
82+
if ('cost' in complete && complete.cost) {
83+
await recordSpend(apiKeyInfo, complete.cost, options)
84+
} else if ('error' in complete) {
8785
const { key: _key, ...context } = apiKeyInfo
8886
if (disableKey) {
89-
logfire.error('api key blocked', { context, error: 'Unable to calculate cost' })
90-
await blockApiKey(apiKeyInfo, options, 'Unable to calculate cost')
87+
logfire.reportError('api key blocked', complete.error, { context })
88+
await blockApiKey(apiKeyInfo, options, JSON.stringify(complete.error))
89+
} else {
90+
logfire.reportError('Unable to calculate cost', complete.error, { context })
9191
}
92-
logfire.error('Unable to calculate cost', { context, error: 'Unable to calculate cost' })
9392
}
9493
await otel.send()
9594
})(),

gateway/src/otel/attributes.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,25 @@ export function genAiOtelAttributes(
5858
return [spanName, attributes, level]
5959
}
6060

61+
export function attributesFromRequest(request: Request): Attributes {
62+
return {
63+
'http.request.method': request.method,
64+
'url.full': request.url,
65+
...Object.fromEntries(
66+
Array.from(request.headers.entries()).map(([name, value]) => [`http.request.header.${name}`, value]),
67+
),
68+
}
69+
}
70+
71+
export function attributesFromResponse(response: Response): Attributes {
72+
return {
73+
'http.response.status_code': response.status,
74+
...Object.fromEntries(
75+
Array.from(response.headers.entries()).map(([name, value]) => [`http.response.header.${name}`, value]),
76+
),
77+
}
78+
}
79+
6180
/** Semantic conventions for Generative AI
6281
* @see https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/
6382
*/

gateway/src/providers/default.ts

Lines changed: 35 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import type { GatewayOptions } from '..'
1212
import type { ModelAPI } from '../api'
1313
import type { BaseAPI } from '../api/base'
1414
import type { OtelSpan } from '../otel'
15-
import type { GenAIAttributes } from '../otel/attributes'
15+
import { attributesFromRequest, attributesFromResponse, type GenAIAttributes } from '../otel/attributes'
1616
import type { ApiKeyInfo, ProviderProxy } from '../types'
1717
import { runAfter } from '../utils'
1818

@@ -29,10 +29,7 @@ export interface ProxySuccess {
2929
}
3030

3131
export interface ProxyWhitelistedEndpoint {
32-
requestBody: string
33-
httpStatusCode: number
34-
responseHeaders: Headers
35-
responseBody: string
32+
response: Response
3633
}
3734

3835
export interface ProxyStreamingSuccess {
@@ -42,7 +39,7 @@ export interface ProxyStreamingSuccess {
4239
responseHeaders: Headers
4340
responseStream: ReadableStream
4441
otelAttributes?: GenAIAttributes
45-
waitCompletion: Promise<void>
42+
onStreamComplete: Promise<{ cost?: number } | { error: Error }>
4643
// In case we get to the end of the response, and we are unable to calculate the cost, we need to know if we can disable the key.
4744
disableKey?: boolean
4845
}
@@ -297,27 +294,16 @@ export class DefaultProviderProxy {
297294
const response = await this.fetch(url, { method, headers: requestHeaders, body: requestBodyText })
298295

299296
if (this.isWhitelistedEndpoint()) {
300-
// TODO(Marcelo): We can't read the body if it's a streaming response.
301-
const responseBody = await response.text()
302-
const { headers, status } = response
303297
this.otelSpan.end(
304298
`${this.request.method} ${this.restOfPath}`,
305299
{
306-
'http.method': this.request.method,
307-
'http.url': this.restOfPath,
308-
'http.response.status_code': status,
300+
...attributesFromRequest(this.request),
301+
...attributesFromResponse(response),
309302
'http.request.body.text': requestBodyText,
310-
'http.response.body.text': responseBody,
311-
...Object.fromEntries(
312-
Array.from(requestHeaders.entries()).map(([name, value]) => [`http.request.header.${name}`, value]),
313-
),
314-
...Object.fromEntries(
315-
Array.from(headers.entries()).map(([name, value]) => [`http.response.header.${name}`, value]),
316-
),
317303
},
318304
{ level: 'info' },
319305
)
320-
return { requestBody: requestBodyText, httpStatusCode: status, responseHeaders: headers, responseBody }
306+
return { response }
321307
}
322308

323309
// Each provider should be able to modify the response headers, e.g. remove openai org
@@ -402,44 +388,55 @@ export class DefaultProviderProxy {
402388

403389
// Track completion but don't wait for it before returning
404390
this.runAfter('extract-stream', extractionPromise)
405-
const waitCompletion = extractionPromise.catch(() => {}) // Swallow errors, already logged
391+
392+
const onStreamComplete = extractionPromise
393+
.then((result) => {
394+
// TODO(Marcelo): I think we actually need to emit 2 spans: one for HTTP, and another for the LLM.
395+
this.otelSpan.end(
396+
`chat ${modelAPI.extractedRequest?.requestModel ?? 'streaming'}`,
397+
{
398+
...modelAPI.toGenAiOtelAttributes(),
399+
...attributesFromRequest(this.request),
400+
...attributesFromResponse(response),
401+
},
402+
{ level: 'info' },
403+
)
404+
405+
return result
406+
})
407+
.catch() // Swallow errors, already logged
406408

407409
return {
408410
requestModel,
409411
requestBody: requestBodyText,
410412
successStatus: response.status,
411413
responseHeaders,
412414
responseStream,
413-
waitCompletion,
415+
onStreamComplete,
414416
}
415417
}
416418

417-
private async processChunks<T>(modelAPI: BaseAPI<unknown, unknown, T>, events: AsyncIterable<T>): Promise<void> {
419+
private async processChunks<T>(
420+
modelAPI: BaseAPI<unknown, unknown, T>,
421+
events: AsyncIterable<T>,
422+
): Promise<{ cost?: number } | { error: Error }> {
418423
for await (const chunk of events) {
419424
modelAPI.processChunk(chunk)
420425
}
421426

422-
this.otelSpan.end(
423-
`chat ${modelAPI.extractedRequest?.requestModel ?? 'streaming'}`,
424-
// TODO(Marcelo): Missing the HTTP attributes - Should we pass them around or store in ModelAPI?
425-
{ ...modelAPI.toGenAiOtelAttributes() },
426-
{ level: 'info' },
427-
)
428-
429427
const provider = this.usageProvider()
430428
const usage = modelAPI.extractedResponse.usage
431429
const responseModel = modelAPI.extractedResponse.responseModel
432430

433431
if (!provider || !usage || !responseModel) {
434-
logfire.warning('Unable to calculate cost', { provider, usage, responseModel })
432+
return { error: new Error(`Unable to calculate cost for model ${responseModel}`) }
433+
}
434+
435+
const price = calcPrice(usage, responseModel, { provider })
436+
if (price) {
437+
return { cost: price.total_price }
435438
} else {
436-
const price = calcPrice(usage, responseModel, { providerId: this.providerId() })
437-
if (price) {
438-
this.cost = price.total_price
439-
logfire.info('cost {cost}', { cost: this.cost, usage, responseModel })
440-
} else {
441-
logfire.warning('Unable to calculate cost', { provider, usage, responseModel })
442-
}
439+
return { error: new Error(`Unable to calculate cost for model ${responseModel} and provider ${provider.name}`) }
443440
}
444441
}
445442

gateway/test/providers/anthropic.spec.ts.snap

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1130,7 +1130,7 @@ exports[`anthropic > should call anthropic via gateway with stream > span 1`] =
11301130
{
11311131
"key": "logfire.json_schema",
11321132
"value": {
1133-
"stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.cache_write_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"}}}",
1133+
"stringValue": "{"type":"object","properties":{"gen_ai.system":{"type":"string"},"gen_ai.operation.name":{"type":"string"},"gen_ai.request.model":{"type":"string"},"gen_ai.request.max_tokens":{"type":"number"},"gen_ai.response.model":{"type":"string"},"gen_ai.response.id":{"type":"string"},"gen_ai.usage.input_tokens":{"type":"number"},"gen_ai.usage.cache_read_tokens":{"type":"number"},"gen_ai.usage.cache_write_tokens":{"type":"number"},"gen_ai.usage.output_tokens":{"type":"number"},"http.request.method":{"type":"string"},"url.full":{"type":"string"},"http.request.header.accept":{"type":"string"},"http.request.header.anthropic-version":{"type":"string"},"http.request.header.authorization":{"type":"string"},"http.request.header.content-type":{"type":"string"},"http.request.header.user-agent":{"type":"string"},"http.request.header.x-stainless-arch":{"type":"string"},"http.request.header.x-stainless-lang":{"type":"string"},"http.request.header.x-stainless-os":{"type":"string"},"http.request.header.x-stainless-package-version":{"type":"string"},"http.request.header.x-stainless-retry-count":{"type":"string"},"http.request.header.x-stainless-runtime":{"type":"string"},"http.request.header.x-stainless-runtime-version":{"type":"string"},"http.request.header.x-stainless-timeout":{"type":"string"},"http.response.status_code":{"type":"number"},"http.response.header.server":{"type":"string"},"http.response.header.transfer-encoding":{"type":"string"}}}",
11341134
},
11351135
},
11361136
{
@@ -1199,5 +1199,113 @@ exports[`anthropic > should call anthropic via gateway with stream > span 1`] =
11991199
"intValue": 10,
12001200
},
12011201
},
1202+
{
1203+
"key": "http.request.method",
1204+
"value": {
1205+
"stringValue": "POST",
1206+
},
1207+
},
1208+
{
1209+
"key": "url.full",
1210+
"value": {
1211+
"stringValue": "https://example.com/anthropic/v1/messages?beta=true",
1212+
},
1213+
},
1214+
{
1215+
"key": "http.request.header.accept",
1216+
"value": {
1217+
"stringValue": "application/json",
1218+
},
1219+
},
1220+
{
1221+
"key": "http.request.header.anthropic-version",
1222+
"value": {
1223+
"stringValue": "2023-06-01",
1224+
},
1225+
},
1226+
{
1227+
"key": "http.request.header.authorization",
1228+
"value": {
1229+
"stringValue": "Bearer healthy",
1230+
},
1231+
},
1232+
{
1233+
"key": "http.request.header.content-type",
1234+
"value": {
1235+
"stringValue": "application/json",
1236+
},
1237+
},
1238+
{
1239+
"key": "http.request.header.user-agent",
1240+
"value": {
1241+
"stringValue": "Anthropic/JS 0.62.0",
1242+
},
1243+
},
1244+
{
1245+
"key": "http.request.header.x-stainless-arch",
1246+
"value": {
1247+
"stringValue": "unknown",
1248+
},
1249+
},
1250+
{
1251+
"key": "http.request.header.x-stainless-lang",
1252+
"value": {
1253+
"stringValue": "js",
1254+
},
1255+
},
1256+
{
1257+
"key": "http.request.header.x-stainless-os",
1258+
"value": {
1259+
"stringValue": "Unknown",
1260+
},
1261+
},
1262+
{
1263+
"key": "http.request.header.x-stainless-package-version",
1264+
"value": {
1265+
"stringValue": "0.62.0",
1266+
},
1267+
},
1268+
{
1269+
"key": "http.request.header.x-stainless-retry-count",
1270+
"value": {
1271+
"stringValue": "0",
1272+
},
1273+
},
1274+
{
1275+
"key": "http.request.header.x-stainless-runtime",
1276+
"value": {
1277+
"stringValue": "unknown",
1278+
},
1279+
},
1280+
{
1281+
"key": "http.request.header.x-stainless-runtime-version",
1282+
"value": {
1283+
"stringValue": "unknown",
1284+
},
1285+
},
1286+
{
1287+
"key": "http.request.header.x-stainless-timeout",
1288+
"value": {
1289+
"stringValue": "600",
1290+
},
1291+
},
1292+
{
1293+
"key": "http.response.status_code",
1294+
"value": {
1295+
"intValue": 200,
1296+
},
1297+
},
1298+
{
1299+
"key": "http.response.header.server",
1300+
"value": {
1301+
"stringValue": "uvicorn",
1302+
},
1303+
},
1304+
{
1305+
"key": "http.response.header.transfer-encoding",
1306+
"value": {
1307+
"stringValue": "chunked",
1308+
},
1309+
},
12021310
]
12031311
`;

0 commit comments

Comments
 (0)