Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions models/spring-ai-openai/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.ai.openai.api;

/**
* Exception thrown when a ChatCompletionChunk cannot be parsed from streaming response.
* This typically occurs when the LLM returns malformed JSON.
*
* @author Liu Guodong
* @since 1.0.0
*/
public class ChatCompletionParseException extends RuntimeException {

private final String rawContent;

/**
* Constructs a new ChatCompletionParseException.
* @param message the detail message
* @param rawContent the raw content that failed to parse
* @param cause the cause of the parsing failure
*/
public ChatCompletionParseException(String message, String rawContent, Throwable cause) {
super(message, cause);
this.rawContent = rawContent;
}

/**
* Returns the raw content that failed to parse.
* @return the raw content string
*/
public String getRawContent() {
return this.rawContent;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
*/
public class OpenAiApi {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OpenAiApi.class);

public static final String HTTP_USER_AGENT_HEADER = "User-Agent";

public static final String SPRING_AI_USER_AGENT = "spring-ai";
Expand Down Expand Up @@ -116,6 +118,8 @@ public static Builder builder() {

private OpenAiStreamFunctionCallingHelper chunkMerger = new OpenAiStreamFunctionCallingHelper();

private StreamErrorHandlingStrategy streamErrorHandlingStrategy = StreamErrorHandlingStrategy.SKIP;

/**
* Create a new chat completion api.
* @param baseUrl api base URL.
Expand Down Expand Up @@ -245,16 +249,29 @@ public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chat
.headers(headers -> {
headers.addAll(additionalHttpHeader);
addDefaultHeadersIfMissing(headers);
}) // @formatter:on
})
.body(Mono.just(chatRequest), ChatCompletionRequest.class)
.retrieve()
.bodyToFlux(String.class)
// Split by newlines to handle multi-line responses (common in tests with MockWebServer)
.flatMap(content -> Flux.fromArray(content.split("\\r?\\n")))
// Filter out empty lines
.filter(line -> !line.trim().isEmpty())
// cancels the flux stream after the "[DONE]" is received.
.takeUntil(SSE_DONE_PREDICATE)
// filters out the "[DONE]" message.
.filter(SSE_DONE_PREDICATE.negate())
.map(content -> ModelOptionsUtils.jsonToObject(content, ChatCompletionChunk.class))
// Detect is the chunk is part of a streaming function call.
// Parse JSON string to ChatCompletionChunk with error handling
.flatMap(content -> {
try {
ChatCompletionChunk chunk = ModelOptionsUtils.jsonToObject(content, ChatCompletionChunk.class);
return Mono.just(chunk);
}
catch (Exception e) {
return handleParseError(content, e);
}
})
// Detect if the chunk is part of a streaming function call.
.map(chunk -> {
if (this.chunkMerger.isStreamingToolFunctionCall(chunk)) {
isInsideTool.set(true);
Expand All @@ -276,12 +293,52 @@ public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chat
// Flux<Flux<ChatCompletionChunk>> -> Flux<Mono<ChatCompletionChunk>>
.concatMapIterable(window -> {
Mono<ChatCompletionChunk> monoChunk = window.reduce(
new ChatCompletionChunk(null, null, null, null, null, null, null, null),
(previous, current) -> this.chunkMerger.merge(previous, current));
return List.of(monoChunk);
})
// Flux<Mono<ChatCompletionChunk>> -> Flux<ChatCompletionChunk>
.flatMap(mono -> mono);
// @formatter:on
}

/**
* Handles parsing errors when processing streaming chat completion chunks. The
* behavior depends on the configured {@link StreamErrorHandlingStrategy}.
* @param content the raw content that failed to parse
* @param e the exception that occurred during parsing
* @return a Mono that either emits nothing (skip), emits an error, or logs and
* continues
*/
private Mono<ChatCompletionChunk> handleParseError(String content, Exception e) {
String errorMessage = String.format(
"Failed to parse ChatCompletionChunk from streaming response. "
+ "Raw content: [%s]. This may indicate malformed JSON from the LLM. Error: %s",
content, e.getMessage());

switch (this.streamErrorHandlingStrategy) {
case FAIL_FAST:
logger.error(errorMessage, e);
return Mono.error(new ChatCompletionParseException("Invalid JSON chunk received from LLM", content, e));

case LOG_AND_CONTINUE:
logger.warn(errorMessage);
logger.debug("Full stack trace for JSON parsing error:", e);
return Mono.empty();

case SKIP:
default:
logger.warn("Skipping invalid chunk in streaming response. Raw content: [{}]. Error: {}", content,
e.getMessage());
return Mono.empty();
}
}

/**
* Sets the error handling strategy for streaming chat completion parsing errors.
* @param strategy the strategy to use when encountering JSON parsing errors
*/
public void setStreamErrorHandlingStrategy(StreamErrorHandlingStrategy strategy) {
this.streamErrorHandlingStrategy = strategy != null ? strategy : StreamErrorHandlingStrategy.SKIP;
}

/**
Expand Down Expand Up @@ -2006,6 +2063,7 @@ public Builder(OpenAiApi api) {
this.restClientBuilder = api.restClient != null ? api.restClient.mutate() : RestClient.builder();
this.webClientBuilder = api.webClient != null ? api.webClient.mutate() : WebClient.builder();
this.responseErrorHandler = api.getResponseErrorHandler();
this.streamErrorHandlingStrategy = api.streamErrorHandlingStrategy;
}

private String baseUrl = OpenAiApiConstants.DEFAULT_BASE_URL;
Expand All @@ -2024,6 +2082,8 @@ public Builder(OpenAiApi api) {

private ResponseErrorHandler responseErrorHandler = RetryUtils.DEFAULT_RESPONSE_ERROR_HANDLER;

private StreamErrorHandlingStrategy streamErrorHandlingStrategy = StreamErrorHandlingStrategy.SKIP;

public Builder baseUrl(String baseUrl) {
Assert.hasText(baseUrl, "baseUrl cannot be null or empty");
this.baseUrl = baseUrl;
Expand Down Expand Up @@ -2077,10 +2137,18 @@ public Builder responseErrorHandler(ResponseErrorHandler responseErrorHandler) {
return this;
}

public Builder streamErrorHandlingStrategy(StreamErrorHandlingStrategy streamErrorHandlingStrategy) {
Assert.notNull(streamErrorHandlingStrategy, "streamErrorHandlingStrategy cannot be null");
this.streamErrorHandlingStrategy = streamErrorHandlingStrategy;
return this;
}

public OpenAiApi build() {
Assert.notNull(this.apiKey, "apiKey must be set");
return new OpenAiApi(this.baseUrl, this.apiKey, this.headers, this.completionsPath, this.embeddingsPath,
this.restClientBuilder, this.webClientBuilder, this.responseErrorHandler);
OpenAiApi api = new OpenAiApi(this.baseUrl, this.apiKey, this.headers, this.completionsPath,
this.embeddingsPath, this.restClientBuilder, this.webClientBuilder, this.responseErrorHandler);
api.setStreamErrorHandlingStrategy(this.streamErrorHandlingStrategy);
return api;
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.ai.openai.api;

/**
* Strategy for handling JSON parsing errors in streaming chat completions. This is
* particularly useful when dealing with LLMs that may return malformed JSON, such as
* Qwen3-8B or other custom models.
*
* @author Liu Guodong
* @since 1.0.0
*/
public enum StreamErrorHandlingStrategy {

/**
* Skip invalid chunks and continue processing the stream. This is the default and
* recommended strategy for production use. Invalid chunks are logged but do not
* interrupt the stream.
*/
SKIP,

/**
* Fail immediately when encountering an invalid chunk. The error is propagated
* through the reactive stream, terminating the stream processing.
*/
FAIL_FAST,

/**
* Log the error and continue processing. Similar to SKIP but with more detailed
* logging. Use this for debugging or when you want to monitor the frequency of
* parsing errors.
*/
LOG_AND_CONTINUE

}
Loading
Loading