Skip to content

Commit 8c2093c

Browse files
authored
Improve extraction (#107)
1 parent ee6394a commit 8c2093c

File tree

39 files changed

+1020
-1298
lines changed

39 files changed

+1020
-1298
lines changed

deploy/test/index.spec.ts.snap

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ exports[`deploy > should call openai via gateway > llm 1`] = `
1414
},
1515
},
1616
],
17-
"created": 1758119078,
18-
"id": "chatcmpl-CGnNGUOMdokZ0Ph0QUfUsK19MdByR",
17+
"created": 1761828474,
18+
"id": "chatcmpl-CWMMElxV7Z5jV4zs2g2cRQjZTsY8M",
1919
"model": "gpt-5-2025-08-07",
2020
"object": "chat.completion",
2121
"service_tier": "default",
@@ -118,7 +118,7 @@ exports[`deploy > should call openai via gateway > span 1`] = `
118118
{
119119
"key": "gen_ai.response.id",
120120
"value": {
121-
"stringValue": "chatcmpl-CGnNGUOMdokZ0Ph0QUfUsK19MdByR",
121+
"stringValue": "chatcmpl-CWMMElxV7Z5jV4zs2g2cRQjZTsY8M",
122122
},
123123
},
124124
{
@@ -293,7 +293,7 @@ exports[`deploy > should call openai via gateway > span 1`] = `
293293
{
294294
"key": "http.response.body.text",
295295
"value": {
296-
"stringValue": "{"id":"chatcmpl-CGnNGUOMdokZ0Ph0QUfUsK19MdByR","object":"chat.completion","created":1758119078,"model":"gpt-5-2025-08-07","choices":[{"index":0,"message":{"role":"assistant","content":"Paris.","refusal":null,"annotations":[]},"finish_reason":"stop"}],"usage":{"prompt_tokens":23,"completion_tokens":75,"total_tokens":98,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":64,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0},"pydantic_ai_gateway":{"cost_estimate":0.00077875}},"service_tier":"default","system_fingerprint":null}",
296+
"stringValue": "{"id":"chatcmpl-CWMMElxV7Z5jV4zs2g2cRQjZTsY8M","object":"chat.completion","created":1761828474,"model":"gpt-5-2025-08-07","choices":[{"index":0,"message":{"role":"assistant","content":"Paris.","refusal":null,"annotations":[]},"finish_reason":"stop"}],"usage":{"prompt_tokens":23,"completion_tokens":75,"total_tokens":98,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":64,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0},"pydantic_ai_gateway":{"cost_estimate":0.00077875}},"service_tier":"default","system_fingerprint":null}",
297297
},
298298
},
299299
{

examples/pai_openai_stream.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,41 @@
11
import asyncio
2+
from datetime import date
23

34
import logfire
5+
from pydantic import BaseModel, field_validator
46
from pydantic_ai import Agent
57

6-
logfire.configure(service_name='testing', send_to_logfire=False)
8+
logfire.configure(service_name='testing')
79
logfire.instrument_pydantic_ai()
810
logfire.instrument_httpx(capture_all=True)
911

12+
13+
class Person(BaseModel, use_attribute_docstrings=True):
14+
name: str
15+
"""The name of the person."""
16+
dob: date
17+
"""The date of birth of the person. MUST BE A VALID ISO 8601 date."""
18+
city: str
19+
"""The city where the person lives."""
20+
21+
@field_validator('dob')
22+
def validate_dob(cls, v: date) -> date:
23+
if v >= date(1900, 1, 1):
24+
raise ValueError('The person must be born in the 19th century')
25+
return v
26+
27+
1028
person_agent = Agent(
11-
'gateway:openai/gpt-5', instructions='You are a helpful assistant.', model_settings={'max_tokens': 1024}, retries=0
29+
'gateway:openai/gpt-5',
30+
instructions='You are a helpful assistant.',
31+
model_settings={'max_tokens': 1024},
32+
retries=2,
33+
output_type=Person,
1234
)
1335

1436

1537
async def main():
16-
async for event in person_agent.run_stream_events('What is the capital of France?'):
38+
async for event in person_agent.run_stream_events("Samuel lived in London and was born on Jan 28th '87"):
1739
print(repr(event))
1840

1941

gateway/src/api/anthropic.ts

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ import type {
22
BetaContentBlock,
33
BetaContentBlockParam,
44
BetaMessage,
5+
BetaRawMessageStreamEvent,
56
MessageCreateParams,
67
} from '@anthropic-ai/sdk/resources/beta'
78
import type { InputMessages, JsonValue, MessagePart, OutputMessages, TextPart } from '../otel/genai'
8-
import { BaseAPI } from './base'
9+
import { BaseAPI, type ExtractedRequest, type ExtractedResponse, type ExtractorConfig } from './base'
910

10-
// TODO(Marcelo): We use the beta API in PydanticAI, but does it matter here?
11-
12-
export class AnthropicAPI extends BaseAPI<MessageCreateParams, BetaMessage> {
11+
export class AnthropicAPI extends BaseAPI<MessageCreateParams, BetaMessage, BetaRawMessageStreamEvent> {
1312
requestStopSequences = (requestBody: MessageCreateParams): string[] | undefined => requestBody.stop_sequences
1413
requestTemperature = (requestBody: MessageCreateParams): number | undefined => requestBody.temperature
1514
requestTopK = (requestBody: MessageCreateParams): number | undefined => requestBody.top_k
@@ -51,6 +50,53 @@ export class AnthropicAPI extends BaseAPI<MessageCreateParams, BetaMessage> {
5150
},
5251
]
5352
}
53+
54+
// SafeExtractor implementation
55+
56+
requestExtractors: ExtractorConfig<MessageCreateParams, ExtractedRequest> = {
57+
requestModel: (requestBody: MessageCreateParams) => {
58+
this.extractedRequest.requestModel = requestBody.model ?? undefined
59+
},
60+
maxTokens: (requestBody: MessageCreateParams) => {
61+
this.extractedRequest.maxTokens = requestBody.max_tokens ?? undefined
62+
},
63+
temperature: (requestBody: MessageCreateParams) => {
64+
this.extractedRequest.temperature = requestBody.temperature ?? undefined
65+
},
66+
topK: (requestBody: MessageCreateParams) => {
67+
this.extractedRequest.topK = requestBody.top_k ?? undefined
68+
},
69+
topP: (requestBody: MessageCreateParams) => {
70+
this.extractedRequest.topP = requestBody.top_p ?? undefined
71+
},
72+
stopSequences: (requestBody: MessageCreateParams) => {
73+
this.extractedRequest.stopSequences = requestBody.stop_sequences ?? undefined
74+
},
75+
systemInstructions: (requestBody: MessageCreateParams) => {
76+
this.extractedRequest.systemInstructions = this.systemInstructions(requestBody)
77+
},
78+
}
79+
80+
chunkExtractors: ExtractorConfig<BetaRawMessageStreamEvent, ExtractedResponse> = {
81+
usage: (chunk: BetaRawMessageStreamEvent) => {
82+
if ('usage' in chunk && chunk.usage) {
83+
this.extractedResponse.usage = this.extractUsage(chunk)
84+
}
85+
},
86+
responseModel: (chunk: BetaRawMessageStreamEvent) => {
87+
if (chunk.type === 'message_start') {
88+
this.extractedResponse.responseModel = chunk.message.model
89+
}
90+
},
91+
responseId: (chunk: BetaRawMessageStreamEvent) => {
92+
if (chunk.type === 'message_start') {
93+
this.extractedResponse.responseId = chunk.message.id
94+
}
95+
},
96+
finishReasons: (_chunk: BetaRawMessageStreamEvent) => {},
97+
// TODO(Marcelo): We should implement this one.
98+
outputMessages: (_chunk: BetaRawMessageStreamEvent) => {},
99+
}
54100
}
55101

56102
function mapParts(content: string | BetaContentBlockParam[] | BetaContentBlock[]): MessagePart[] {

gateway/src/api/base.ts

Lines changed: 107 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,124 @@ import type { InputMessages, OutputMessages, TextPart } from '../otel/genai'
44
import { type JsonData, safe } from '../providers/default'
55
import type { ProviderID } from '../types'
66

7-
export abstract class BaseAPI<RequestBody, ResponseBody>
8-
implements GenAIAttributesExtractor<RequestBody, ResponseBody>
7+
export interface ExtractedRequest {
8+
requestModel?: string
9+
temperature?: number
10+
maxTokens?: number
11+
systemInstructions?: TextPart[]
12+
topP?: number
13+
topK?: number
14+
stopSequences?: string[]
15+
seed?: number
16+
inputMessages?: InputMessages
17+
}
18+
19+
export interface ExtractedResponse {
20+
responseModel: string
21+
responseId: string
22+
finishReasons: string[]
23+
outputMessages: OutputMessages
24+
usage: Usage
25+
}
26+
27+
export type FieldExtractor<Data> = (data: Data) => void
28+
29+
export type ExtractorConfig<Data, Target> = {
30+
[K in keyof Target]?: FieldExtractor<Data>
31+
}
32+
33+
export type ExtractedData = ExtractedRequest & ExtractedResponse
34+
35+
export interface SafeExtractor<RequestBody, ResponseBody, StreamChunk> {
36+
extractedRequest: ExtractedRequest
37+
extractedResponse: Partial<ExtractedResponse>
38+
39+
processRequest(request: RequestBody): void
40+
requestExtractors: ExtractorConfig<RequestBody, ExtractedRequest>
41+
42+
processResponse(response: ResponseBody): void
43+
44+
processChunk(chunk: StreamChunk): void
45+
chunkExtractors: ExtractorConfig<StreamChunk, ExtractedResponse>
46+
}
47+
48+
export abstract class BaseAPI<RequestBody, ResponseBody, StreamChunk = JsonData>
49+
implements GenAIAttributesExtractor<RequestBody, ResponseBody>, SafeExtractor<RequestBody, ResponseBody, StreamChunk>
950
{
1051
/** @apiFlavor: the flavor of the API, used to determine the response model and usage */
1152
apiFlavor: string | undefined = undefined
1253

1354
readonly providerId: ProviderID
1455
readonly requestModel?: string
1556

57+
extractedRequest: ExtractedRequest = {}
58+
extractedResponse: Partial<ExtractedResponse> = {}
59+
1660
constructor(providerId: ProviderID, requestModel?: string) {
1761
this.providerId = providerId
1862
this.requestModel = requestModel
1963
}
2064

21-
// TODO(Marcelo): This is not used anywhere yet! We should remove this note when we use it.
22-
extractUsage(responseBody: ResponseBody): Usage | undefined {
23-
const provider = findProvider({ providerId: this.providerId })
24-
if (!provider) {
25-
// This should never happen, but we will throw an error to be safe.
26-
throw new Error(`Provider not found for provider ID: ${this.providerId}`)
65+
requestExtractors: ExtractorConfig<RequestBody, ExtractedRequest> = {}
66+
chunkExtractors: ExtractorConfig<StreamChunk, ExtractedResponse> = {}
67+
68+
processRequest(request: RequestBody): void {
69+
for (const extractor of Object.values(this.requestExtractors)) {
70+
safe(extractor)(request)
71+
}
72+
}
73+
74+
processResponse(_response: ResponseBody): void {
75+
throw new Error('Method not implemented.')
76+
}
77+
78+
// This runs O(K * N) where K is the number of chunkExtractors and N is the number of chunks.
79+
// Although this seems inefficient, K is a constant and N is typically small.
80+
// We do this because we want to ensure that we extract each field separately, so the logic of one of the extractors
81+
// doesn't make another one to fail.
82+
processChunk(chunk: StreamChunk): void {
83+
for (const extractor of Object.values(this.chunkExtractors)) {
84+
safe(extractor)(chunk)
2785
}
86+
}
87+
88+
extractUsage(responseBody: ResponseBody | StreamChunk): Usage | undefined {
89+
const provider = findProvider({ providerId: this.providerId })
90+
// This should never happen because we know the provider ID is valid, but we will throw an error to be safe.
91+
if (!provider) throw new Error(`Provider not found for provider ID: ${this.providerId}`)
2892
const { usage } = extractUsage(provider, responseBody, this.apiFlavor)
2993
return usage
3094
}
3195

96+
toGenAiOtelAttributes(): GenAIAttributes {
97+
return omitUndefined({
98+
'gen_ai.system': this.providerId,
99+
'gen_ai.operation.name': 'chat',
100+
// Request Attributes
101+
'gen_ai.request.model': this.extractedRequest?.requestModel,
102+
'gen_ai.request.max_tokens': this.extractedRequest?.maxTokens,
103+
'gen_ai.request.temperature': this.extractedRequest?.temperature,
104+
'gen_ai.request.top_p': this.extractedRequest?.topP,
105+
'gen_ai.request.top_k': this.extractedRequest?.topK,
106+
'gen_ai.request.stop_sequences': this.extractedRequest?.stopSequences,
107+
'gen_ai.request.seed': this.extractedRequest?.seed,
108+
'gen_ai.system_instructions': this.extractedRequest?.systemInstructions,
109+
'gen_ai.input.messages': this.extractedRequest?.inputMessages,
110+
// Response Attributes
111+
'gen_ai.response.model': this.extractedResponse?.responseModel,
112+
'gen_ai.response.id': this.extractedResponse?.responseId,
113+
'gen_ai.response.finish_reasons': this.extractedResponse?.finishReasons,
114+
'gen_ai.output.messages': this.extractedResponse?.outputMessages,
115+
'gen_ai.usage.input_tokens': this.extractedResponse?.usage?.input_tokens,
116+
'gen_ai.usage.cache_read_tokens': this.extractedResponse?.usage?.cache_read_tokens,
117+
'gen_ai.usage.cache_write_tokens': this.extractedResponse?.usage?.cache_write_tokens,
118+
'gen_ai.usage.output_tokens': this.extractedResponse?.usage?.output_tokens,
119+
'gen_ai.usage.input_audio_tokens': this.extractedResponse?.usage?.input_audio_tokens,
120+
'gen_ai.usage.cache_audio_read_tokens': this.extractedResponse?.usage?.cache_audio_read_tokens,
121+
'gen_ai.usage.output_audio_tokens': this.extractedResponse?.usage?.output_audio_tokens,
122+
})
123+
}
124+
32125
// GenAIAttributesExtractor implementation
33126

34127
requestMaxTokens?: (requestBody: RequestBody) => number | undefined
@@ -45,6 +138,8 @@ export abstract class BaseAPI<RequestBody, ResponseBody>
45138

46139
extractOtelAttributes(requestBody: JsonData, responseBody: JsonData): GenAIAttributes {
47140
return {
141+
'gen_ai.system': this.providerId,
142+
'gen_ai.operation.name': 'chat',
48143
'gen_ai.request.max_tokens': this.genAIAttributes('requestMaxTokens', requestBody as RequestBody),
49144
'gen_ai.request.top_k': this.genAIAttributes('requestTopK', requestBody as RequestBody),
50145
'gen_ai.request.top_p': this.genAIAttributes('requestTopP', requestBody as RequestBody),
@@ -72,3 +167,7 @@ export abstract class BaseAPI<RequestBody, ResponseBody>
72167
return undefined
73168
}
74169
}
170+
171+
function omitUndefined<T extends Record<string, unknown>>(obj: T): Partial<T> {
172+
return Object.fromEntries(Object.entries(obj).filter(([_, v]) => v !== undefined)) as Partial<T>
173+
}

gateway/src/api/chat.ts

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
/** This module implements the OpenAI Chat Completion API.
22
* @see https://platform.openai.com/docs/api-reference/chat
33
*/
4+
45
import * as logfire from '@pydantic/logfire-api'
56
import mime from 'mime-types'
67
import type {
78
ChatCompletion,
9+
ChatCompletionChunk,
810
ChatCompletionCreateParams,
911
ChatCompletionMessageParam,
1012
} from 'openai/resources/chat/completions'
1113
import type { ChatMessage, InputMessages, MessagePart, OutputMessage, OutputMessages } from '../otel/genai'
12-
import { BaseAPI } from './base'
14+
import { BaseAPI, type ExtractedRequest, type ExtractedResponse, type ExtractorConfig } from './base'
1315

14-
export class ChatCompletionAPI extends BaseAPI<ChatCompletionCreateParams, ChatCompletion> {
16+
export class ChatCompletionAPI extends BaseAPI<ChatCompletionCreateParams, ChatCompletion, ChatCompletionChunk> {
1517
apiFlavor = 'chat'
1618

1719
requestStopSequences = (requestBody: ChatCompletionCreateParams): string[] | undefined => {
@@ -41,6 +43,56 @@ export class ChatCompletionAPI extends BaseAPI<ChatCompletionCreateParams, ChatC
4143
outputMessages = (responseBody: ChatCompletion): OutputMessages | undefined => {
4244
return responseBody.choices.map(mapOutputMessage)
4345
}
46+
47+
// SafeExtractor implementation
48+
49+
requestExtractors: ExtractorConfig<ChatCompletionCreateParams, ExtractedRequest> = {
50+
requestModel: (requestBody: ChatCompletionCreateParams) => {
51+
this.extractedRequest.requestModel = requestBody.model ?? undefined
52+
},
53+
maxTokens: (requestBody: ChatCompletionCreateParams) => {
54+
this.extractedRequest.maxTokens = requestBody.max_completion_tokens ?? undefined
55+
},
56+
temperature: (requestBody: ChatCompletionCreateParams) => {
57+
this.extractedRequest.temperature = requestBody.temperature ?? undefined
58+
},
59+
topP: (requestBody: ChatCompletionCreateParams) => {
60+
this.extractedRequest.topP = requestBody.top_p ?? undefined
61+
},
62+
stopSequences: (requestBody: ChatCompletionCreateParams) => {
63+
this.extractedRequest.stopSequences =
64+
typeof requestBody.stop === 'string' ? [requestBody.stop] : (requestBody.stop ?? undefined)
65+
},
66+
}
67+
68+
chunkExtractors: ExtractorConfig<ChatCompletionChunk, ExtractedResponse> = {
69+
usage: (chunk: ChatCompletionChunk) => {
70+
if ('usage' in chunk && chunk.usage) {
71+
this.extractedResponse.usage = this.extractUsage(chunk)
72+
}
73+
},
74+
responseModel: (chunk: ChatCompletionChunk) => {
75+
if ('model' in chunk && chunk.model) {
76+
this.extractedResponse.responseModel = chunk.model
77+
}
78+
},
79+
responseId: (chunk: ChatCompletionChunk) => {
80+
if ('id' in chunk && chunk.id) {
81+
this.extractedResponse.responseId = chunk.id
82+
}
83+
},
84+
finishReasons: (chunk: ChatCompletionChunk) => {
85+
const finishReasons: string[] = []
86+
for (const choice of chunk.choices) {
87+
if (choice.finish_reason) {
88+
finishReasons.push(choice.finish_reason)
89+
}
90+
}
91+
this.extractedResponse.finishReasons = finishReasons.length > 0 ? finishReasons : undefined
92+
},
93+
// TODO(Marcelo): We should implement this one.
94+
outputMessages: (_chunk: ChatCompletionChunk) => {},
95+
}
4496
}
4597

4698
export function mapInputMessage(message: ChatCompletionMessageParam): ChatMessage {

0 commit comments

Comments
 (0)