Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/cpp/include/openvino/genai/json_container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ class OPENVINO_GENAI_EXPORTS JsonContainer {
*/
static JsonContainer from_json_string(const std::string& json_str);

/**
* @brief Concatenate two JsonContainers.
* @param dst Destination JsonContainer to append to
* @param src Source JsonContainer to append from
* @throw ov::Exception if keys in both containers are not strings.
*/
static void concatenate(JsonContainer& dst, const JsonContainer& src);

/**
* @brief Create JsonContainer as an empty JSON object.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/include/openvino/genai/parsers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class OPENVINO_GENAI_EXPORTS IncrementalParser {
* @return std::string Filtered text that should be added to the content
*/
virtual std::string parse(
JsonContainer& message,
JsonContainer& delta_message,
std::string& delta_text,
const std::optional<std::vector<int64_t>>& delta_tokens = std::nullopt
) = 0;
Expand Down Expand Up @@ -222,7 +222,7 @@ class OPENVINO_GENAI_EXPORTS ReasoningIncrementalParser : public IncrementalPars
* @return std::string Filtered text with reasoning content processed according to configuration
Copy link
Contributor Author

@pavel-esir pavel-esir Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update doc. Replace message -> delta_message in @param message JsonContainer to store parsed results and reasoning metadata

*/
std::string parse(
JsonContainer& message,
JsonContainer& delta_message,
std::string& delta_text,
const std::optional<std::vector<int64_t>>& delta_tokens = std::nullopt
) override;
Expand Down
24 changes: 24 additions & 0 deletions src/cpp/src/json_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,5 +409,29 @@ void* JsonContainer::_get_json_value_ptr() const {
return m_impl->get_json_value_ptr(m_path, AccessMode::Read);
}

void JsonContainer::concatenate(JsonContainer& dst, const JsonContainer& src) {
auto dst_ = static_cast<nlohmann::ordered_json*>(dst._get_json_value_ptr());
auto src_ = static_cast<const nlohmann::ordered_json*>(src._get_json_value_ptr());

for (auto it = src_->begin(); it != src_->end(); ++it) {
const auto& src_val = it.value();
// Check if both values are of string type only if need to concatenate them.
// Otherwise just write the source value to destination. Extra check is not needed.

if (!dst_->contains(it.key())) {
(*dst_)[it.key()] = src_val;
continue;
}

auto& dst_val = (*dst_)[it.key()];
OPENVINO_ASSERT(
src_val.is_string() && dst_val.is_string(),
"JsonContainer concatenate supports only string concatenation for object values. "
"Key: '", it.key(), "', src_val type: '", src_val.type_name(), "', dst_val type: '", dst_val.type_name(), "'."
);
dst_val = dst_val.get<std::string>() + src_val.get<std::string>();
}
}

} // namespace genai
} // namespace ov
81 changes: 37 additions & 44 deletions src/cpp/src/parsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,6 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
bool m_think_tag_opened = false;
std::string m_text_cache = "";
bool m_deactivated = false;

/**
* @brief Ensure required fields exist in the message container.
*/
void ensure_message_fields(JsonContainer& message) {
if (!message.contains("reasoning_content")) {
message["reasoning_content"] = "";
}
if (!message.contains("content")) {
message["content"] = "";
}
}

/**
* @brief Find the longest suffix of text that is a prefix of the close tag.
Expand Down Expand Up @@ -61,8 +49,8 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
void handle_complete_reasoning(JsonContainer& message, std::string_view txt_chunk,
size_t open_idx, size_t close_idx, std::string& delta_text) {
// Extract reasoning content between tags
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(),
close_idx - (open_idx + m_open_tag.size())));
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), close_idx - (open_idx + m_open_tag.size())));
Copy link

Copilot AI Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The calculation close_idx - (open_idx + m_open_tag.size()) is duplicated from the removed line. Consider extracting this to a variable for clarity, e.g., size_t reasoning_length = close_idx - (open_idx + m_open_tag.size());

Suggested change
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), close_idx - (open_idx + m_open_tag.size())));
size_t reasoning_length = close_idx - (open_idx + m_open_tag.size());
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), reasoning_length));

Copilot uses AI. Check for mistakes.
message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));

if (!m_keep_original_content) {
delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
Expand All @@ -76,31 +64,37 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
/**
* @brief Handle the case where only the open tag is found.
*/
void handle_open_tag(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, size_t open_idx, std::string& delta_text) {
void handle_open_tag(JsonContainer& delta_message, std::string_view txt_chunk, size_t open_idx, std::string& delta_text) {
// Start accumulating reasoning content
reason_str.append(txt_chunk.substr(open_idx + m_open_tag.size()));
message["reasoning_content"] = std::move(reason_str);
delta_message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size()));

if (!m_keep_original_content) {
delta_text.clear();
} else {
delta_text = txt_chunk;
}

m_think_tag_opened = true;
m_text_cache.clear();
}

/**
* @brief Handle the case where the close tag is found.
*/
void handle_close_tag(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, size_t close_idx, std::string& delta_text) {
void handle_close_tag(JsonContainer& delta_message, std::string_view txt_chunk, size_t close_idx, std::string& delta_text) {
// Append text before close tag to reasoning content
reason_str.append(txt_chunk.substr(0, close_idx));
message["reasoning_content"] = std::move(reason_str);
delta_message["reasoning_content"] = std::move(std::string(txt_chunk.substr(0, close_idx)));
auto content = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
delta_message["content"] = content;

if (!m_keep_original_content) {
delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
// Despite the fact that we put txt_chunk to delta_text it's correct.
// Since txt_chunk contains some cached parts from the previous calls that were not yet processed yet
// and we kept them in cache until we decide what to do with them. Here we definitely know that that cached parts
// belonged to reasoning_content so we can discard them.
delta_text = content;
} else {
delta_text = txt_chunk;
}

m_text_cache.clear();
Expand All @@ -111,25 +105,27 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
/**
* @brief Handle accumulating text while inside reasoning tags.
*/
void handle_inside_reasoning(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, std::string& delta_text) {
void handle_inside_reasoning(JsonContainer& delta_message, std::string_view txt_chunk, std::string& delta_text) {
// Find if the end of txt_chunk might be the start of a close tag
const size_t num_chars_to_keep = find_close_tag_prefix_length(txt_chunk);

std::string reason_str;
if (num_chars_to_keep > 0) {
// Keep potential partial close tag in cache
m_text_cache = std::string(txt_chunk.substr(txt_chunk.size() - num_chars_to_keep));
reason_str.append(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep));
reason_str = txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep);
if (m_keep_original_content) {
delta_text = std::string(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep));
}
} else {
// No partial close tag, accumulate all text
reason_str.append(txt_chunk);
reason_str = txt_chunk;
m_text_cache.clear();
}

delta_message["reasoning_content"] = std::move(reason_str);
if (!m_keep_original_content) {
delta_text.clear();
}
message["reasoning_content"] = std::move(reason_str);
}

public:
Expand All @@ -145,7 +141,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
m_close_tag(close_tag) {}

std::string parse(
JsonContainer& message,
JsonContainer& delta_message,
std::string& delta_text,
const std::optional<std::vector<int64_t>>& delta_tokens
) {
Expand All @@ -157,13 +153,8 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
}
m_first_run = false;

ensure_message_fields(message);

const std::string txt_chunk = m_text_cache + delta_text;
std::string reason_str;
if (message.contains("reasoning_content")) {
reason_str = std::move(message["reasoning_content"].get_string());
}
std::string txt_chunk = m_text_cache + delta_text;

// Cache find() results to avoid redundant searches
const auto open_idx = txt_chunk.find(m_open_tag);
Expand All @@ -175,20 +166,22 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
? close_idx : std::string::npos;

if (close_idx_after_open != std::string::npos) {
handle_complete_reasoning(message, txt_chunk, open_idx, close_idx_after_open, delta_text);
handle_complete_reasoning(delta_message, txt_chunk, open_idx, close_idx_after_open, delta_text);
} else {
handle_open_tag(message, reason_str, txt_chunk, open_idx, delta_text);
handle_open_tag(delta_message, txt_chunk, open_idx, delta_text);
}
} else if (m_think_tag_opened && close_idx != std::string::npos) {
handle_close_tag(message, reason_str, txt_chunk, close_idx, delta_text);
handle_close_tag(delta_message, txt_chunk, close_idx, delta_text);
} else if (m_think_tag_opened) {
handle_inside_reasoning(message, reason_str, txt_chunk, delta_text);
handle_inside_reasoning(delta_message, txt_chunk, delta_text);
} else {
// Think tag was not opened yet and not found in the current delta_text.
// Accumulate text in the cache to detect if <think> is split between several delta_text pieces.
m_text_cache += delta_text;
// Intentionally clear delta_text: no delta content is returned to the user during this phase
// (we are waiting for the <think> tag to be fully detected in the cache).
Comment on lines +181 to +182
Copy link

Copilot AI Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment above this line describes why delta_text is cleared, but it should be updated to reflect that this intentional clearing is part of the caching strategy while waiting for the full <think> tag.

Suggested change
// Intentionally clear delta_text: no delta content is returned to the user during this phase
// (we are waiting for the <think> tag to be fully detected in the cache).
// Intentionally clear delta_text as part of the caching strategy:
// no delta content is returned to the user during this phase because we are
// accumulating partial data in m_text_cache until the full <think> tag is detected.

Copilot uses AI. Check for mistakes.
delta_text.clear();
}

return delta_text;
}

Expand All @@ -207,11 +200,11 @@ ReasoningIncrementalParser::ReasoningIncrementalParser(bool expect_open_tag, boo
ReasoningIncrementalParser::~ReasoningIncrementalParser() = default;

std::string ReasoningIncrementalParser::parse(
JsonContainer& message,
JsonContainer& delta_message,
std::string& delta_text,
const std::optional<std::vector<int64_t>>& delta_tokens
) {
return m_impl->parse(message, delta_text, delta_tokens);
return m_impl->parse(delta_message, delta_text, delta_tokens);
}

void ReasoningIncrementalParser::reset() {
Expand Down
22 changes: 16 additions & 6 deletions src/cpp/src/text_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,21 @@ std::vector<std::shared_ptr<IncrementalParser>> m_parsers;
JsonContainer m_parsed_message;

TextParserStreamerImpl(std::vector<std::shared_ptr<IncrementalParser>> parsers) : m_parsers{parsers} {}

};

TextParserStreamer::TextParserStreamer(const Tokenizer& tokenizer, std::vector<std::shared_ptr<IncrementalParser>> parsers)
: TextStreamer(tokenizer, [this](std::string s) -> CallbackTypeVariant {
return this->write(s);
}), m_pimpl{std::make_unique<TextParserStreamerImpl>(parsers)} {}
}), m_pimpl{std::make_unique<TextParserStreamerImpl>(parsers)} {
m_pimpl->m_parsed_message["content"] = "";
}

CallbackTypeVariant TextParserStreamer::write(std::string message) {
CallbackTypeVariant TextParserStreamer::write(std::string delta_text) {
// When 'write' is called with string, it means new chunk of tokens is decoded into text

auto flushed_tokens = std::vector<int64_t>();
if (message.back() == '\n') {
if (delta_text.back() == '\n') {
// Flush all tokens
flushed_tokens.assign(m_tokens_cache.begin(), m_tokens_cache.end());
} else if (m_decoded_lengths.size() >= delay_n_tokens) {
Expand All @@ -177,13 +180,19 @@ CallbackTypeVariant TextParserStreamer::write(std::string message) {
}
}

// Every time we start to cycle through iterative parsers we create a new delta_message.
// Parsers should neither delete fields nor rewrite; they should only append or add new fields.
// The only field that is updated automatically is "content": delta_text is put there.
JsonContainer delta_message;
// Iterate over all parsers and apply them to the message
for (auto& parser: m_pimpl->m_parsers) {
message = parser->parse(m_pimpl->m_parsed_message, message, flushed_tokens);
delta_text = parser->parse(delta_message, delta_text, flushed_tokens);
// Message can be modified inside parser, if parser for example extracted tool calling from message content
m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + message;
}
return write(m_pimpl->m_parsed_message);
delta_message["content"] = delta_text;

JsonContainer::concatenate(m_pimpl->m_parsed_message, delta_message);
return write(delta_message);
}

JsonContainer TextParserStreamer::get_parsed_message() const {
Expand All @@ -192,6 +201,7 @@ JsonContainer TextParserStreamer::get_parsed_message() const {

void TextParserStreamer::reset() {
m_pimpl->m_parsed_message = JsonContainer();
m_pimpl->m_parsed_message["content"] = "";
for (auto& parser : m_pimpl->m_parsers) {
parser->reset();
}
Expand Down
5 changes: 3 additions & 2 deletions tests/cpp/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ TEST_F(DeepSeekR1ReasoningParserTest, ReasoningContentAccumulatesAcrossCalls) {
std::string ref_res = "First, I recognize that the question is asking for the sum of 2 and 1.\n\nI know that addition involves combining two numbers to find their total.\n\nStarting with 2, I add 1 to it.\n\n2 plus 1 equals 3.\n";

JsonContainer msg;

JsonContainer accumulated_msg;
for (int i = 1; i < input_stream.size(); i++) {
std::string delta_text = input_stream[i];
delta_text = parser.parse(msg, delta_text);
JsonContainer::concatenate(accumulated_msg, msg);
}
ASSERT_EQ(msg["reasoning_content"], ref_res);
ASSERT_EQ(accumulated_msg["reasoning_content"], ref_res);
}

TEST(ParserTest, test_custom_parser) {
Expand Down
Loading
Loading