From 2be242f6b0299419a2fddf68af27e37261d77edf Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 17 Nov 2025 13:15:10 +0100 Subject: [PATCH 1/8] write chunks in TextParserStreamer --- src/cpp/src/parsers.cpp | 49 +++++++++--------------------- src/cpp/src/text_streamer.cpp | 26 ++++++++++++++-- tests/python_tests/test_parsers.py | 14 ++++----- 3 files changed, 45 insertions(+), 44 deletions(-) diff --git a/src/cpp/src/parsers.cpp b/src/cpp/src/parsers.cpp index d359fad2fc..8d16542bd9 100644 --- a/src/cpp/src/parsers.cpp +++ b/src/cpp/src/parsers.cpp @@ -59,14 +59,10 @@ class ReasoningIncrementalParser::ReasoningParserImpl { * @brief Handle the case where both open and close tags are found in the same chunk. */ void handle_complete_reasoning(JsonContainer& message, std::string_view txt_chunk, - size_t open_idx, size_t close_idx, std::string& delta_text) { + size_t open_idx, size_t close_idx) { // Extract reasoning content between tags - message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), - close_idx - (open_idx + m_open_tag.size()))); - - if (!m_keep_original_content) { - delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); - } + message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), close_idx - (open_idx + m_open_tag.size()))); + message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); m_think_tag_opened = false; m_deactivated = true; @@ -77,15 +73,11 @@ class ReasoningIncrementalParser::ReasoningParserImpl { * @brief Handle the case where only the open tag is found. */ void handle_open_tag(JsonContainer& message, std::string& reason_str, - std::string_view txt_chunk, size_t open_idx, std::string& delta_text) { + std::string_view txt_chunk, size_t open_idx) { // Start accumulating reasoning content reason_str.append(txt_chunk.substr(open_idx + m_open_tag.size())); message["reasoning_content"] = std::move(reason_str); - - if (!m_keep_original_content) { - delta_text.clear(); - } - + m_think_tag_opened = true; m_text_cache.clear(); } @@ -93,15 +85,10 @@ class ReasoningIncrementalParser::ReasoningParserImpl { /** * @brief Handle the case where the close tag is found. */ - void handle_close_tag(JsonContainer& message, std::string& reason_str, - std::string_view txt_chunk, size_t close_idx, std::string& delta_text) { + void handle_close_tag(JsonContainer& message, std::string_view txt_chunk, size_t close_idx) { // Append text before close tag to reasoning content - reason_str.append(txt_chunk.substr(0, close_idx)); - message["reasoning_content"] = std::move(reason_str); - - if (!m_keep_original_content) { - delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); - } + message["reasoning_content"] = std::move(std::string(txt_chunk.substr(0, close_idx))); + message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));; m_text_cache.clear(); m_think_tag_opened = false; @@ -111,8 +98,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl { /** * @brief Handle accumulating text while inside reasoning tags. */ - void handle_inside_reasoning(JsonContainer& message, std::string& reason_str, - std::string_view txt_chunk, std::string& delta_text) { + void handle_inside_reasoning(JsonContainer& message, std::string& reason_str, std::string_view txt_chunk) { // Find if the end of txt_chunk might be the start of a close tag const size_t num_chars_to_keep = find_close_tag_prefix_length(txt_chunk); @@ -126,9 +112,6 @@ class ReasoningIncrementalParser::ReasoningParserImpl { m_text_cache.clear(); } - if (!m_keep_original_content) { - delta_text.clear(); - } message["reasoning_content"] = std::move(reason_str); } @@ -150,6 +133,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl { const std::optional>& delta_tokens ) { if (m_deactivated) { + message["content"] = delta_text; return delta_text; } if (!m_expect_open_tag && m_first_run) { @@ -160,10 +144,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl { ensure_message_fields(message); const std::string txt_chunk = m_text_cache + delta_text; - std::string reason_str; - if (message.contains("reasoning_content")) { - reason_str = std::move(message["reasoning_content"].get_string()); - } + std::string reason_str = std::move(message["reasoning_content"].get_string()); // Cache find() results to avoid redundant searches const auto open_idx = txt_chunk.find(m_open_tag); @@ -175,14 +156,14 @@ class ReasoningIncrementalParser::ReasoningParserImpl { ? close_idx : std::string::npos; if (close_idx_after_open != std::string::npos) { - handle_complete_reasoning(message, txt_chunk, open_idx, close_idx_after_open, delta_text); + handle_complete_reasoning(message, txt_chunk, open_idx, close_idx_after_open); } else { - handle_open_tag(message, reason_str, txt_chunk, open_idx, delta_text); + handle_open_tag(message, reason_str, txt_chunk, open_idx); } } else if (m_think_tag_opened && close_idx != std::string::npos) { - handle_close_tag(message, reason_str, txt_chunk, close_idx, delta_text); + handle_close_tag(message, txt_chunk, close_idx); } else if (m_think_tag_opened) { - handle_inside_reasoning(message, reason_str, txt_chunk, delta_text); + handle_inside_reasoning(message, reason_str, txt_chunk); } else { // Think tag was not opened yet and not found in the current delta_text. // Accumulate text in the cache to detect if is split between several delta_text pieces. diff --git a/src/cpp/src/text_streamer.cpp b/src/cpp/src/text_streamer.cpp index 56a32fe35b..ccb8509a5d 100644 --- a/src/cpp/src/text_streamer.cpp +++ b/src/cpp/src/text_streamer.cpp @@ -141,8 +141,23 @@ std::vector> m_parsers; JsonContainer m_parsed_message; TextParserStreamerImpl(std::vector> parsers) : m_parsers{parsers} {} + }; +void concatenate_json_containers(JsonContainer& from, const JsonContainer& to, std::vector keys_to_concatenate) { + for (const auto& key : keys_to_concatenate) { + if (to.contains(key) && from.contains(key)) { + // If both are strings, concatenate + if (to[key].is_string() && from[key].is_string()) { + to[key] = to[key].get_string() + from[key].get_string(); + } + } else if (from.contains(key)) { + auto r = from[key]; + to[key] = from[key]; + } + } +} + TextParserStreamer::TextParserStreamer(const Tokenizer& tokenizer, std::vector> parsers) : TextStreamer(tokenizer, [this](std::string s) -> CallbackTypeVariant { return this->write(s); @@ -177,13 +192,18 @@ CallbackTypeVariant TextParserStreamer::write(std::string message) { } } + JsonContainer msg; // Iterate over all parsers and apply them to the message for (auto& parser: m_pimpl->m_parsers) { - message = parser->parse(m_pimpl->m_parsed_message, message, flushed_tokens); + message = parser->parse(msg, message, flushed_tokens); // Message can be modified inside parser, if parser for example extracted tool calling from message content - m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + message; } - return write(m_pimpl->m_parsed_message); + + // concatenate msg with m_parsed_message + concatenate_json_containers(msg, m_pimpl->m_parsed_message, {"content", "reasoning_content"}); + + // return write(m_pimpl->m_parsed_message); + return write(msg); } JsonContainer TextParserStreamer::get_parsed_message() const { diff --git a/tests/python_tests/test_parsers.py b/tests/python_tests/test_parsers.py index 1282b63b24..1d381608b4 100644 --- a/tests/python_tests/test_parsers.py +++ b/tests/python_tests/test_parsers.py @@ -4,7 +4,7 @@ from utils.hugging_face import convert_and_save_tokenizer, download_and_convert_model from utils.ov_genai_pipelines import create_ov_pipeline import pytest -from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, Phi4ReasoningParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser +from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, Phi4ReasoningParser, ReasoningParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser from transformers import AutoTokenizer import re @@ -51,13 +51,13 @@ def write(self, message): return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[Phi4ReasoningIncrementalParser()]) - msg = {} for subword in stream_string: streamer._write(subword) think_content = answer.split("")[0].replace("", "") content = answer - + + msg = streamer.get_parsed_message() assert msg['reasoning_content'] == think_content assert msg['content'] == content @@ -161,17 +161,17 @@ def test_incremental_phi4_reason_parser_2(hf_ov_genai_models, split_answer): class CustomStreamer(TextParserStreamer): def write(self, message): - msg.update(message) + # will be accumulated automatically inside streamer return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[Phi4ReasoningIncrementalParser()]) - msg = {} for subword in split_answer: streamer._write(subword) think_content = (''.join(split_answer)).split("")[0].replace("", "") - content = ''.join(split_answer) + content = (''.join(split_answer).split("")[1]) + msg = streamer.get_parsed_message() assert msg['reasoning_content'] == think_content assert msg['content'] == content @@ -378,7 +378,7 @@ def write(self, message): streamer = CustomStreamer(tok, parsers=[Phi4ReasoningIncrementalParser()]) prompt = "Please say \"hello\"" - res = pipe.generate([prompt], max_new_tokens=600, parsers=[Phi4ReasoningParser()]) + res = pipe.generate([prompt], max_new_tokens=600, parsers=[ReasoningParser(keep_original_content=False)]) # extract manually reasoning content from the parsed result content = res.texts[0] From 4017fb5193ead8b05e07239f689c8ea9ee432e36 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Fri, 21 Nov 2025 13:29:04 +0100 Subject: [PATCH 2/8] Fix pytests and gtests for parsers --- src/cpp/src/parsers.cpp | 66 ++++++++++++++++-------- src/cpp/src/text_streamer.cpp | 20 ++++++-- tests/cpp/parser.cpp | 9 +++- tests/python_tests/test_parsers.py | 80 ++++++++++++++++++++---------- 4 files changed, 121 insertions(+), 54 deletions(-) diff --git a/src/cpp/src/parsers.cpp b/src/cpp/src/parsers.cpp index 8d16542bd9..4098f87ace 100644 --- a/src/cpp/src/parsers.cpp +++ b/src/cpp/src/parsers.cpp @@ -25,12 +25,12 @@ class ReasoningIncrementalParser::ReasoningParserImpl { * @brief Ensure required fields exist in the message container. */ void ensure_message_fields(JsonContainer& message) { - if (!message.contains("reasoning_content")) { + // if (!message.contains("reasoning_content")) { message["reasoning_content"] = ""; - } - if (!message.contains("content")) { + // } + // if (!message.contains("content")) { message["content"] = ""; - } + // } } /** @@ -59,11 +59,15 @@ class ReasoningIncrementalParser::ReasoningParserImpl { * @brief Handle the case where both open and close tags are found in the same chunk. */ void handle_complete_reasoning(JsonContainer& message, std::string_view txt_chunk, - size_t open_idx, size_t close_idx) { + size_t open_idx, size_t close_idx, std::string& delta_text) { // Extract reasoning content between tags message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), close_idx - (open_idx + m_open_tag.size()))); message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); + if (!m_keep_original_content) { + delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); + } + m_think_tag_opened = false; m_deactivated = true; m_text_cache.clear(); @@ -73,10 +77,15 @@ class ReasoningIncrementalParser::ReasoningParserImpl { * @brief Handle the case where only the open tag is found. */ void handle_open_tag(JsonContainer& message, std::string& reason_str, - std::string_view txt_chunk, size_t open_idx) { + std::string_view txt_chunk, size_t open_idx, std::string& delta_text) { // Start accumulating reasoning content - reason_str.append(txt_chunk.substr(open_idx + m_open_tag.size())); - message["reasoning_content"] = std::move(reason_str); + message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size())); + + if (!m_keep_original_content) { + delta_text.clear(); + } else { + delta_text = txt_chunk; + } m_think_tag_opened = true; m_text_cache.clear(); @@ -85,10 +94,21 @@ class ReasoningIncrementalParser::ReasoningParserImpl { /** * @brief Handle the case where the close tag is found. */ - void handle_close_tag(JsonContainer& message, std::string_view txt_chunk, size_t close_idx) { + void handle_close_tag(JsonContainer& message, std::string_view txt_chunk, size_t close_idx, std::string& delta_text) { // Append text before close tag to reasoning content message["reasoning_content"] = std::move(std::string(txt_chunk.substr(0, close_idx))); - message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));; + auto content = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); + message["content"] = content; + + if (!m_keep_original_content) { + // Despite the fact that we put txt_chung to delta_text it's correct. + // Since txt_chunk contains some cached parts from the previous calls that were not yet processed yet + // and we kept them in cache until we decide what to do with them. Here we definitely know that that cached parts + // belonged to reasoning_content so we can discard them. + delta_text = content; + } else { + delta_text = txt_chunk; + } m_text_cache.clear(); m_think_tag_opened = false; @@ -98,21 +118,24 @@ class ReasoningIncrementalParser::ReasoningParserImpl { /** * @brief Handle accumulating text while inside reasoning tags. */ - void handle_inside_reasoning(JsonContainer& message, std::string& reason_str, std::string_view txt_chunk) { + void handle_inside_reasoning(JsonContainer& message, std::string& reason_str, std::string_view txt_chunk, std::string& delta_text) { // Find if the end of txt_chunk might be the start of a close tag const size_t num_chars_to_keep = find_close_tag_prefix_length(txt_chunk); if (num_chars_to_keep > 0) { // Keep potential partial close tag in cache m_text_cache = std::string(txt_chunk.substr(txt_chunk.size() - num_chars_to_keep)); - reason_str.append(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep)); + reason_str = txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep); + delta_text = std::string(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep)); } else { // No partial close tag, accumulate all text - reason_str.append(txt_chunk); + reason_str = txt_chunk; m_text_cache.clear(); } - message["reasoning_content"] = std::move(reason_str); + if (!m_keep_original_content) { + delta_text.clear(); + } } public: @@ -132,6 +155,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl { std::string& delta_text, const std::optional>& delta_tokens ) { + ensure_message_fields(message); if (m_deactivated) { message["content"] = delta_text; return delta_text; @@ -141,10 +165,9 @@ class ReasoningIncrementalParser::ReasoningParserImpl { } m_first_run = false; - ensure_message_fields(message); - const std::string txt_chunk = m_text_cache + delta_text; - std::string reason_str = std::move(message["reasoning_content"].get_string()); + std::string txt_chunk = m_text_cache + delta_text; + std::string reason_str = message.contains("reasoning_content") ? std::move(message["reasoning_content"].get_string()) : ""; // Cache find() results to avoid redundant searches const auto open_idx = txt_chunk.find(m_open_tag); @@ -156,18 +179,19 @@ class ReasoningIncrementalParser::ReasoningParserImpl { ? close_idx : std::string::npos; if (close_idx_after_open != std::string::npos) { - handle_complete_reasoning(message, txt_chunk, open_idx, close_idx_after_open); + handle_complete_reasoning(message, txt_chunk, open_idx, close_idx_after_open, delta_text); } else { - handle_open_tag(message, reason_str, txt_chunk, open_idx); + handle_open_tag(message, reason_str, txt_chunk, open_idx, delta_text); } } else if (m_think_tag_opened && close_idx != std::string::npos) { - handle_close_tag(message, txt_chunk, close_idx); + handle_close_tag(message, txt_chunk, close_idx, delta_text); } else if (m_think_tag_opened) { - handle_inside_reasoning(message, reason_str, txt_chunk); + handle_inside_reasoning(message, reason_str, txt_chunk, delta_text); } else { // Think tag was not opened yet and not found in the current delta_text. // Accumulate text in the cache to detect if is split between several delta_text pieces. m_text_cache += delta_text; + delta_text.clear(); } return delta_text; diff --git a/src/cpp/src/text_streamer.cpp b/src/cpp/src/text_streamer.cpp index ccb8509a5d..fb2b8fc522 100644 --- a/src/cpp/src/text_streamer.cpp +++ b/src/cpp/src/text_streamer.cpp @@ -144,7 +144,7 @@ TextParserStreamerImpl(std::vector> parsers) }; -void concatenate_json_containers(JsonContainer& from, const JsonContainer& to, std::vector keys_to_concatenate) { +void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector keys_to_concatenate) { for (const auto& key : keys_to_concatenate) { if (to.contains(key) && from.contains(key)) { // If both are strings, concatenate @@ -152,7 +152,6 @@ void concatenate_json_containers(JsonContainer& from, const JsonContainer& to, s to[key] = to[key].get_string() + from[key].get_string(); } } else if (from.contains(key)) { - auto r = from[key]; to[key] = from[key]; } } @@ -161,7 +160,9 @@ void concatenate_json_containers(JsonContainer& from, const JsonContainer& to, s TextParserStreamer::TextParserStreamer(const Tokenizer& tokenizer, std::vector> parsers) : TextStreamer(tokenizer, [this](std::string s) -> CallbackTypeVariant { return this->write(s); - }), m_pimpl{std::make_unique(parsers)} {} + }), m_pimpl{std::make_unique(parsers)} { + m_pimpl->m_parsed_message["content"] = ""; + } CallbackTypeVariant TextParserStreamer::write(std::string message) { // When 'write' is called with string, it means new chunk of tokens is decoded into text @@ -199,10 +200,18 @@ CallbackTypeVariant TextParserStreamer::write(std::string message) { // Message can be modified inside parser, if parser for example extracted tool calling from message content } + // std::cout << msg["content"].get_string() << std::endl; + // std::cout << msg["reasoning_content"].get_string() << std::endl; + // concatenate msg with m_parsed_message - concatenate_json_containers(msg, m_pimpl->m_parsed_message, {"content", "reasoning_content"}); + concatenate_json_containers(msg, m_pimpl->m_parsed_message, {"reasoning_content"}); + + // We have to put into DeltaMessage's "content" fields only chunks that belong to content. + // But into m_parsed_message["content"] we need to accumulate full content if m_keep_original_content == True + // and if m_keep_original_content == False only part that is outside reasoning tags and outside tool calls. + + m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + message; - // return write(m_pimpl->m_parsed_message); return write(msg); } @@ -212,6 +221,7 @@ JsonContainer TextParserStreamer::get_parsed_message() const { void TextParserStreamer::reset() { m_pimpl->m_parsed_message = JsonContainer(); + m_pimpl->m_parsed_message["content"] = ""; for (auto& parser : m_pimpl->m_parsers) { parser->reset(); } diff --git a/tests/cpp/parser.cpp b/tests/cpp/parser.cpp index e4db4da3f2..e7262413b2 100644 --- a/tests/cpp/parser.cpp +++ b/tests/cpp/parser.cpp @@ -9,6 +9,10 @@ using namespace ov::genai; +namespace ov::genai { + void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector keys_to_concatenate); +} + TEST(ParserTest, test_llama3_parser_1) { std::string prompt = R"(What's the weather in New York today?<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n[get_weather(location="New York, NY", unit="celsius")]<|eom_id|>)"; // By default content should keep original values. @@ -92,12 +96,13 @@ TEST_F(DeepSeekR1ReasoningParserTest, ReasoningContentAccumulatesAcrossCalls) { std::string ref_res = "First, I recognize that the question is asking for the sum of 2 and 1.\n\nI know that addition involves combining two numbers to find their total.\n\nStarting with 2, I add 1 to it.\n\n2 plus 1 equals 3.\n"; JsonContainer msg; - + JsonContainer accumulated_msg; for (int i = 1; i < input_stream.size(); i++) { std::string delta_text = input_stream[i]; delta_text = parser.parse(msg, delta_text); + concatenate_json_containers(msg, accumulated_msg, {"reasoning_content", "content"}); } - ASSERT_EQ(msg["reasoning_content"], ref_res); + ASSERT_EQ(accumulated_msg["reasoning_content"], ref_res); } TEST(ParserTest, test_custom_parser) { diff --git a/tests/python_tests/test_parsers.py b/tests/python_tests/test_parsers.py index 1d381608b4..4f0f2823e8 100644 --- a/tests/python_tests/test_parsers.py +++ b/tests/python_tests/test_parsers.py @@ -7,8 +7,20 @@ from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, Phi4ReasoningParser, ReasoningParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser from transformers import AutoTokenizer import re +from io import StringIO +def concatenate_dicts(dst_dict, src_dict): + # keys that exist in both dictionaries + keys = set(dst_dict.keys()).intersection(set(src_dict.keys())) + for key in keys: + dst_dict[key] += src_dict[key] + + # keys that exist in src_dict but missing in dst_dict + missing_keys = set(src_dict.keys()) - set(dst_dict.keys()) + for key in missing_keys: + dst_dict[key] = src_dict[key] + @pytest.fixture(scope="module") def hf_ov_genai_models(request, tmp_path_factory): model_id = request.param @@ -45,9 +57,13 @@ def test_incremental_phi4_reason_parser_1(hf_ov_genai_models, answer): stream_string = re.split(r"(\s+)", answer) + # manually accumulate content from streamer + content = "" + class CustomStreamer(TextParserStreamer): def write(self, message): - msg.update(message) + nonlocal content + content += message['content'] return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[Phi4ReasoningIncrementalParser()]) @@ -55,11 +71,11 @@ def write(self, message): streamer._write(subword) think_content = answer.split("")[0].replace("", "") - content = answer msg = streamer.get_parsed_message() assert msg['reasoning_content'] == think_content - assert msg['content'] == content + assert msg['content'] == answer + assert msg['content'].endswith(content) @pytest.mark.parametrize( @@ -70,13 +86,13 @@ def write(self, message): def test_incremental_phi4_reason_integer_token_ids(hf_ov_genai_models): hf_tokenizer, genai_tokenizer = hf_ov_genai_models + accumulated_msg = {} class CustomStreamer(TextParserStreamer): def write(self, message): - msg.update(message) + concatenate_dicts(accumulated_msg, message) return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[Phi4ReasoningIncrementalParser()]) - msg = {} answer = "\nOkay, the user is asking for the answer to 2 + 1.\n\nThe answer to 2 + 1 is \boxed{3}." encoded_tokens = genai_tokenizer.encode(answer).input_ids.data.tolist()[0] for token in encoded_tokens: @@ -86,8 +102,11 @@ def write(self, message): think_content = answer.split("")[0].replace("", "") content = answer + msg = streamer.get_parsed_message() assert msg['reasoning_content'] == think_content - assert msg['content'] == content + assert msg['content'] == answer + assert accumulated_msg['reasoning_content'] == think_content + assert answer.endswith(accumulated_msg['content']) @pytest.mark.parametrize( @@ -117,14 +136,15 @@ def parse(self, msg: dict, delta_text: str, delta_tokens = None) -> str: elif self.started_reasoning: msg['reasoning_content'] += delta_text delta_text = '' - + # # Here we are only collecting ordinary text, therefore leave delta_text unchanged. - # # msg['content'] += delta_text will happen under the hood + msg['content'] += delta_text # will happen under the hood return delta_text - + + msg = {} class CustomStreamer(TextParserStreamer): def write(self, message): - msg.update(message) + concatenate_dicts(msg, message) return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[CustomIncrementalParser()]) @@ -137,7 +157,7 @@ def write(self, message): for token in encoded_tokens: streamer._write([token]) streamer.end() - + assert msg['reasoning_content'] == "\nOkay, the user is asking for the answer to 2 + 1" assert msg['content'] == " The answer to 2 + 1 is 3." @@ -159,9 +179,11 @@ def test_incremental_phi4_reason_parser_2(hf_ov_genai_models, split_answer): # check that if thinking opening and closing tags are in the middle of the subword, it is still parsed correctly hf_tokenizer, genai_tokenizer = hf_ov_genai_models + msg_manual = {} class CustomStreamer(TextParserStreamer): def write(self, message): # will be accumulated automatically inside streamer + concatenate_dicts(msg_manual, message) return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[Phi4ReasoningIncrementalParser()]) @@ -173,7 +195,10 @@ def write(self, message): msg = streamer.get_parsed_message() assert msg['reasoning_content'] == think_content - assert msg['content'] == content + assert msg['content'].endswith(content) # since msg contains all accumulated content + assert msg_manual['reasoning_content'] == think_content + assert msg_manual['content'] == content + @pytest.mark.parametrize("answer", [ @@ -184,18 +209,19 @@ def test_incremental_phi4_reason_parser_nostreamer(answer): parser = Phi4ReasoningIncrementalParser() stream_string = re.split(r"(\s+)", answer) + msg = {} + accumulated_msg = {} for subword in stream_string: parser.parse(msg, subword) - # When parser is called from streamer, it is expected that content is accumulated inside streamer. - # Here we are calling parser manually therefore we need to accumulate content manually. - msg['content'] += subword + print(msg) + concatenate_dicts(accumulated_msg, msg) think_content = answer.split("")[0].replace("", "") - content = answer + content = answer.split("")[1] - assert msg['reasoning_content'] == think_content - assert msg['content'] == content + assert accumulated_msg['reasoning_content'] == think_content + assert accumulated_msg['content'] == content @pytest.mark.parametrize("keep_original_content", [True, False]) @@ -213,14 +239,14 @@ def test_reasoning_parser_cut_content(hf_ov_genai_models, answer, keep_original_ stream_string = re.split(r"(\s+)", answer) + msg = {} class CustomStreamer(TextParserStreamer): def write(self, message): - msg.update(message) + concatenate_dicts(msg, message) return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[ReasoningIncrementalParser(expect_open_tag=True, keep_original_content=keep_original_content)]) num_runs = 2 - msg = {} for i in range(num_runs): if do_reset: streamer.reset() @@ -256,12 +282,15 @@ def test_incremental_deepseek_parser(): think_content = full_str.split("")[0] content = full_str.split("")[1] + msg = {} + accumulated_msg = {} parser = DeepSeekR1ReasoningIncrementalParser() for subword in stream_string: - msg = parser.parse(msg, subword) + parser.parse(msg, subword) + concatenate_dicts(accumulated_msg, msg) - assert msg['reasoning_content'] == think_content - assert msg['content'] == content + assert accumulated_msg['reasoning_content'] == think_content + assert accumulated_msg['content'] == content @pytest.mark.parametrize( @@ -288,13 +317,13 @@ def parse(self, msg: dict, delta_text: str, delta_tokens = None) -> str: else: if self.main_part_started: msg['main_text'] += delta_text - + msg['content'] += delta_text return delta_text msg = {} class CustomStreamer(TextParserStreamer): def write(self, message): - msg.update(message) + concatenate_dicts(msg, message) return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[CustomParser()]) @@ -302,7 +331,6 @@ def write(self, message): for subword in stream_string: streamer._write(subword) - assert msg['main_text'] == " world " From 6e5dcc2e5d14dab3422a13ee69d7f789d880704a Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Thu, 27 Nov 2025 12:55:36 +0100 Subject: [PATCH 3/8] get rid of 'ensure_message_fields'; extend texts --- src/cpp/src/parsers.cpp | 14 ---- src/cpp/src/text_streamer.cpp | 23 ++++--- tests/python_tests/test_parsers.py | 103 +++++++++++++++++++++++++++-- 3 files changed, 111 insertions(+), 29 deletions(-) diff --git a/src/cpp/src/parsers.cpp b/src/cpp/src/parsers.cpp index 4098f87ace..2931bc6de2 100644 --- a/src/cpp/src/parsers.cpp +++ b/src/cpp/src/parsers.cpp @@ -20,18 +20,6 @@ class ReasoningIncrementalParser::ReasoningParserImpl { bool m_think_tag_opened = false; std::string m_text_cache = ""; bool m_deactivated = false; - - /** - * @brief Ensure required fields exist in the message container. - */ - void ensure_message_fields(JsonContainer& message) { - // if (!message.contains("reasoning_content")) { - message["reasoning_content"] = ""; - // } - // if (!message.contains("content")) { - message["content"] = ""; - // } - } /** * @brief Find the longest suffix of text that is a prefix of the close tag. @@ -155,9 +143,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl { std::string& delta_text, const std::optional>& delta_tokens ) { - ensure_message_fields(message); if (m_deactivated) { - message["content"] = delta_text; return delta_text; } if (!m_expect_open_tag && m_first_run) { diff --git a/src/cpp/src/text_streamer.cpp b/src/cpp/src/text_streamer.cpp index fb2b8fc522..b620c389bb 100644 --- a/src/cpp/src/text_streamer.cpp +++ b/src/cpp/src/text_streamer.cpp @@ -164,11 +164,11 @@ TextParserStreamer::TextParserStreamer(const Tokenizer& tokenizer, std::vectorm_parsed_message["content"] = ""; } -CallbackTypeVariant TextParserStreamer::write(std::string message) { +CallbackTypeVariant TextParserStreamer::write(std::string delta_text) { // When 'write' is called with string, it means new chunk of tokens is decoded into text auto flushed_tokens = std::vector(); - if (message.back() == '\n') { + if (delta_text.back() == '\n') { // Flush all tokens flushed_tokens.assign(m_tokens_cache.begin(), m_tokens_cache.end()); } else if (m_decoded_lengths.size() >= delay_n_tokens) { @@ -196,22 +196,29 @@ CallbackTypeVariant TextParserStreamer::write(std::string message) { JsonContainer msg; // Iterate over all parsers and apply them to the message for (auto& parser: m_pimpl->m_parsers) { - message = parser->parse(msg, message, flushed_tokens); + delta_text = parser->parse(msg, delta_text, flushed_tokens); // Message can be modified inside parser, if parser for example extracted tool calling from message content } + msg["content"] = delta_text; - // std::cout << msg["content"].get_string() << std::endl; - // std::cout << msg["reasoning_content"].get_string() << std::endl; - // concatenate msg with m_parsed_message - concatenate_json_containers(msg, m_pimpl->m_parsed_message, {"reasoning_content"}); + concatenate_json_containers(msg, m_pimpl->m_parsed_message, {"reasoning_content", "content", "tool_calls"}); // We have to put into DeltaMessage's "content" fields only chunks that belong to content. // But into m_parsed_message["content"] we need to accumulate full content if m_keep_original_content == True // and if m_keep_original_content == False only part that is outside reasoning tags and outside tool calls. - m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + message; + // fill in msg["content"] with delta_text + // The line below should be removed and it's function should be performed inside concatenate_json_containers + // m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + delta_text; + + // TODO: on each for cycle iteration they receive an empty delta_message + // and pipe iterates though every parser. They should neither delete fields nor rewrite + // they should only append or add new fields. + // The only field is updated automaticall is "content" + // The remaining delta_text is put there. + // It's parsers responsibility to ensure fields are proper. return write(msg); } diff --git a/tests/python_tests/test_parsers.py b/tests/python_tests/test_parsers.py index 4f0f2823e8..48483c39bd 100644 --- a/tests/python_tests/test_parsers.py +++ b/tests/python_tests/test_parsers.py @@ -35,6 +35,98 @@ def hf_ov_genai_models(request, tmp_path_factory): return hf_tokenizer, genai_tokenizer +@pytest.mark.parametrize( + "hf_ov_genai_models", + ["katuni4ka/tiny-random-phi3"], # this tokenizer is used as a stub only + indirect=True +) +@pytest.mark.parametrize("answer", [ + "\nOkay, the user is asking for the answer to 2 + 1.\n\nThe answer to 2 + 1 is \boxed{3}.", + + ( + "\nOkay, the user is asking for the answer to 2 + 1. Let me make sure I understand " + "the question correctly. They want a short answer, so I shouldn't overcomplicate things. " + "Basic addition here. Two plus one equals three. Yeah, that's straightforward. I need to " + "respond with the answer inside a box using the specified format. Let me double-check the " + "arithmetic to avoid any mistakes. Yep, 2 + 1 is definitely 3. Alright, time to put it in " + "the box.\n\n\nThe answer to 2 + 1 is \boxed{3}." + ), +]) +def test_several_incremental_parsers(hf_ov_genai_models, answer): + hf_tokenizer, genai_tokenizer = hf_ov_genai_models + + stream_string = re.split(r"(\s+)", answer) + + class CustomReasonParser(IncrementalParser): + thinking_started: bool = False + deactivated: bool = False + + def parse(self, message: dict, delta_text: str, delta_tokens = None) -> dict: + + if self.deactivated: + return delta_text + + if not self.thinking_started and delta_text == '': + self.thinking_started = True + elif self.thinking_started and delta_text != '': + message["reasoning_content"] = delta_text + elif self.thinking_started and delta_text == '': + self.deactivated = True + + return delta_text + + + class IncrementalToolParser(IncrementalParser): + started_took_call: bool = False + accumulated_tool_call: StringIO = StringIO() + deactivated: bool = False + + def parse(self, delta_msg: dict, delta_text: str, delta_tokens = None) -> str: + if self.deactivated: + return delta_text + + if delta_text == '{' and not self.started_took_call: + self.started_took_call = True + self.accumulated_tool_call.write(delta_text) + + # If not keep took call in resulting string + # delta_text = '' + elif self.started_took_call and delta_text == '}': + self.started_took_call = False + self.accumulated_tool_call.write(delta_text) + self.deactivated = True + delta_msg['tool_calls'] = [json.loads(self.accumulated_tool_call.getvalue())] + # If not keep took call in resulting string + # delta_text = '' + elif self.started_took_call: + self.accumulated_tool_call.write(delta_text) + + return delta_text + + + class CustomStreamer(TextParserStreamer): + def write(self, message): + print(message) + return StreamingStatus.RUNNING + + streamer = CustomStreamer(genai_tokenizer, parsers=[IncrementalToolParser(), CustomReasonParser()]) + + msg = {} + stream_string = ["Hello", "", " ", "world", " ", "", "!", "{", '"func_name": ', '"weather", ' '"location": "New York"', "}"] + think_content = " world " + # content = ''.join(stream_string).replace("", "").replace("", "") + content = ''.join(stream_string) + tool_call = {"func_name": "weather", "location": "New York"} + + for subword in stream_string: + streamer._write(subword) + + final_msg = streamer.get_parsed_message() + assert final_msg["reasoning_content"] == think_content + assert final_msg["content"] == content + assert final_msg["tool_calls"][0] == tool_call + + @pytest.mark.parametrize( "hf_ov_genai_models", ["katuni4ka/tiny-random-phi3"], # this tokenizer is used as a stub only @@ -149,7 +241,7 @@ def write(self, message): streamer = CustomStreamer(genai_tokenizer, parsers=[CustomIncrementalParser()]) msg = {} - # All closing tags , <|/inst|>, <|endoftext|>, ent. in tiny-random-phi3 add strange \x0c\x0c characters + # All closing tags , <|/inst|>, <|endoftext|>, etc. in tiny-random-phi3 add strange \x0c\x0c characters # so we avoid them in this test. answer = "\nOkay, the user is asking for the answer to 2 + 1.The answer to 2 + 1 is 3." encoded_tokens = genai_tokenizer.encode(answer, add_special_tokens=False).input_ids.data.tolist()[0] @@ -191,9 +283,10 @@ def write(self, message): streamer._write(subword) think_content = (''.join(split_answer)).split("")[0].replace("", "") - content = (''.join(split_answer).split("")[1]) + content = ''.join(split_answer) msg = streamer.get_parsed_message() + # breakpoint() assert msg['reasoning_content'] == think_content assert msg['content'].endswith(content) # since msg contains all accumulated content assert msg_manual['reasoning_content'] == think_content @@ -210,18 +303,15 @@ def test_incremental_phi4_reason_parser_nostreamer(answer): stream_string = re.split(r"(\s+)", answer) - msg = {} accumulated_msg = {} for subword in stream_string: + msg = {} # msg when the first parser is called should be empty parser.parse(msg, subword) - print(msg) concatenate_dicts(accumulated_msg, msg) think_content = answer.split("")[0].replace("", "") - content = answer.split("")[1] assert accumulated_msg['reasoning_content'] == think_content - assert accumulated_msg['content'] == content @pytest.mark.parametrize("keep_original_content", [True, False]) @@ -290,7 +380,6 @@ def test_incremental_deepseek_parser(): concatenate_dicts(accumulated_msg, msg) assert accumulated_msg['reasoning_content'] == think_content - assert accumulated_msg['content'] == content @pytest.mark.parametrize( From 23fa2229b1006e41e44686be535698abac0eedef Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Thu, 27 Nov 2025 23:11:57 +0100 Subject: [PATCH 4/8] use 'JsonContainer::concatenate'; improve variable naming --- .../include/openvino/genai/json_container.hpp | 8 + src/cpp/include/openvino/genai/parsers.hpp | 4 +- src/cpp/src/json_container.cpp | 18 +++ src/cpp/src/parsers.cpp | 33 ++-- src/cpp/src/text_streamer.cpp | 43 +----- tests/cpp/parser.cpp | 9 +- tests/python_tests/test_parsers.py | 143 +++++++----------- 7 files changed, 114 insertions(+), 144 deletions(-) diff --git a/src/cpp/include/openvino/genai/json_container.hpp b/src/cpp/include/openvino/genai/json_container.hpp index d109241eef..68368cebcc 100644 --- a/src/cpp/include/openvino/genai/json_container.hpp +++ b/src/cpp/include/openvino/genai/json_container.hpp @@ -80,6 +80,14 @@ class OPENVINO_GENAI_EXPORTS JsonContainer { */ static JsonContainer from_json_string(const std::string& json_str); + /** + * @brief Concatenate two JsonContainers. + * @param dst Destination JsonContainer to append to + * @param src Source JsonContainer to append from + * @throw ov::Exception if keys in both containers are not strings. + */ + static void concatenate(JsonContainer& dst, const JsonContainer& src); + /** * @brief Create JsonContainer as an empty JSON object. */ diff --git a/src/cpp/include/openvino/genai/parsers.hpp b/src/cpp/include/openvino/genai/parsers.hpp index 156d158aca..96187d043c 100644 --- a/src/cpp/include/openvino/genai/parsers.hpp +++ b/src/cpp/include/openvino/genai/parsers.hpp @@ -175,7 +175,7 @@ class OPENVINO_GENAI_EXPORTS IncrementalParser { * @return std::string Filtered text that should be added to the content */ virtual std::string parse( - JsonContainer& message, + JsonContainer& delta_message, std::string& delta_text, const std::optional>& delta_tokens = std::nullopt ) = 0; @@ -222,7 +222,7 @@ class OPENVINO_GENAI_EXPORTS ReasoningIncrementalParser : public IncrementalPars * @return std::string Filtered text with reasoning content processed according to configuration */ std::string parse( - JsonContainer& message, + JsonContainer& delta_message, std::string& delta_text, const std::optional>& delta_tokens = std::nullopt ) override; diff --git a/src/cpp/src/json_container.cpp b/src/cpp/src/json_container.cpp index 0ef004a8b6..797bff67bf 100644 --- a/src/cpp/src/json_container.cpp +++ b/src/cpp/src/json_container.cpp @@ -409,5 +409,23 @@ void* JsonContainer::_get_json_value_ptr() const { return m_impl->get_json_value_ptr(m_path, AccessMode::Read); } +void JsonContainer::concatenate(JsonContainer& dst, const JsonContainer& src) { + auto dst_ = static_cast(dst._get_json_value_ptr()); + auto src_ = static_cast(src._get_json_value_ptr()); + + for (auto it = src_->begin(); it != src_->end(); ++it) { + const auto& src_val = it.value(); + + if (!dst_->contains(it.key())) { + (*dst_)[it.key()] = src_val; + continue; + } + + OPENVINO_ASSERT(src_val.is_string(), "JsonContainer concatenate supports only string concatenation for object values."); + auto& dst_val = (*dst_)[it.key()]; + dst_val = dst_val.get() + src_val.get(); + } +} + } // namespace genai } // namespace ov diff --git a/src/cpp/src/parsers.cpp b/src/cpp/src/parsers.cpp index 2931bc6de2..082520a2ca 100644 --- a/src/cpp/src/parsers.cpp +++ b/src/cpp/src/parsers.cpp @@ -64,10 +64,9 @@ class ReasoningIncrementalParser::ReasoningParserImpl { /** * @brief Handle the case where only the open tag is found. */ - void handle_open_tag(JsonContainer& message, std::string& reason_str, - std::string_view txt_chunk, size_t open_idx, std::string& delta_text) { + void handle_open_tag(JsonContainer& delta_message, std::string_view txt_chunk, size_t open_idx, std::string& delta_text) { // Start accumulating reasoning content - message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size())); + delta_message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size())); if (!m_keep_original_content) { delta_text.clear(); @@ -82,14 +81,14 @@ class ReasoningIncrementalParser::ReasoningParserImpl { /** * @brief Handle the case where the close tag is found. */ - void handle_close_tag(JsonContainer& message, std::string_view txt_chunk, size_t close_idx, std::string& delta_text) { + void handle_close_tag(JsonContainer& delta_message, std::string_view txt_chunk, size_t close_idx, std::string& delta_text) { // Append text before close tag to reasoning content - message["reasoning_content"] = std::move(std::string(txt_chunk.substr(0, close_idx))); + delta_message["reasoning_content"] = std::move(std::string(txt_chunk.substr(0, close_idx))); auto content = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); - message["content"] = content; + delta_message["content"] = content; if (!m_keep_original_content) { - // Despite the fact that we put txt_chung to delta_text it's correct. + // Despite the fact that we put txt_chunk to delta_text it's correct. // Since txt_chunk contains some cached parts from the previous calls that were not yet processed yet // and we kept them in cache until we decide what to do with them. Here we definitely know that that cached parts // belonged to reasoning_content so we can discard them. @@ -106,10 +105,11 @@ class ReasoningIncrementalParser::ReasoningParserImpl { /** * @brief Handle accumulating text while inside reasoning tags. */ - void handle_inside_reasoning(JsonContainer& message, std::string& reason_str, std::string_view txt_chunk, std::string& delta_text) { + void handle_inside_reasoning(JsonContainer& delta_message, std::string_view txt_chunk, std::string& delta_text) { // Find if the end of txt_chunk might be the start of a close tag const size_t num_chars_to_keep = find_close_tag_prefix_length(txt_chunk); + std::string reason_str; if (num_chars_to_keep > 0) { // Keep potential partial close tag in cache m_text_cache = std::string(txt_chunk.substr(txt_chunk.size() - num_chars_to_keep)); @@ -120,7 +120,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl { reason_str = txt_chunk; m_text_cache.clear(); } - message["reasoning_content"] = std::move(reason_str); + delta_message["reasoning_content"] = std::move(reason_str); if (!m_keep_original_content) { delta_text.clear(); } @@ -139,7 +139,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl { m_close_tag(close_tag) {} std::string parse( - JsonContainer& message, + JsonContainer& delta_message, std::string& delta_text, const std::optional>& delta_tokens ) { @@ -153,7 +153,6 @@ class ReasoningIncrementalParser::ReasoningParserImpl { std::string txt_chunk = m_text_cache + delta_text; - std::string reason_str = message.contains("reasoning_content") ? std::move(message["reasoning_content"].get_string()) : ""; // Cache find() results to avoid redundant searches const auto open_idx = txt_chunk.find(m_open_tag); @@ -165,14 +164,14 @@ class ReasoningIncrementalParser::ReasoningParserImpl { ? close_idx : std::string::npos; if (close_idx_after_open != std::string::npos) { - handle_complete_reasoning(message, txt_chunk, open_idx, close_idx_after_open, delta_text); + handle_complete_reasoning(delta_message, txt_chunk, open_idx, close_idx_after_open, delta_text); } else { - handle_open_tag(message, reason_str, txt_chunk, open_idx, delta_text); + handle_open_tag(delta_message, txt_chunk, open_idx, delta_text); } } else if (m_think_tag_opened && close_idx != std::string::npos) { - handle_close_tag(message, txt_chunk, close_idx, delta_text); + handle_close_tag(delta_message, txt_chunk, close_idx, delta_text); } else if (m_think_tag_opened) { - handle_inside_reasoning(message, reason_str, txt_chunk, delta_text); + handle_inside_reasoning(delta_message, txt_chunk, delta_text); } else { // Think tag was not opened yet and not found in the current delta_text. // Accumulate text in the cache to detect if is split between several delta_text pieces. @@ -198,11 +197,11 @@ ReasoningIncrementalParser::ReasoningIncrementalParser(bool expect_open_tag, boo ReasoningIncrementalParser::~ReasoningIncrementalParser() = default; std::string ReasoningIncrementalParser::parse( - JsonContainer& message, + JsonContainer& delta_message, std::string& delta_text, const std::optional>& delta_tokens ) { - return m_impl->parse(message, delta_text, delta_tokens); + return m_impl->parse(delta_message, delta_text, delta_tokens); } void ReasoningIncrementalParser::reset() { diff --git a/src/cpp/src/text_streamer.cpp b/src/cpp/src/text_streamer.cpp index b620c389bb..98aa63cfbb 100644 --- a/src/cpp/src/text_streamer.cpp +++ b/src/cpp/src/text_streamer.cpp @@ -144,19 +144,6 @@ TextParserStreamerImpl(std::vector> parsers) }; -void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector keys_to_concatenate) { - for (const auto& key : keys_to_concatenate) { - if (to.contains(key) && from.contains(key)) { - // If both are strings, concatenate - if (to[key].is_string() && from[key].is_string()) { - to[key] = to[key].get_string() + from[key].get_string(); - } - } else if (from.contains(key)) { - to[key] = from[key]; - } - } -} - TextParserStreamer::TextParserStreamer(const Tokenizer& tokenizer, std::vector> parsers) : TextStreamer(tokenizer, [this](std::string s) -> CallbackTypeVariant { return this->write(s); @@ -193,33 +180,19 @@ CallbackTypeVariant TextParserStreamer::write(std::string delta_text) { } } - JsonContainer msg; + // Every time we start to cycle through iterative parsers we create a new delta_message. + // Parsers should neither delete fields nor rewrite they should only append or add new fields. + // The only field is updated automaticall is "content": delta_text is put there. + JsonContainer delta_message; // Iterate over all parsers and apply them to the message for (auto& parser: m_pimpl->m_parsers) { - delta_text = parser->parse(msg, delta_text, flushed_tokens); + delta_text = parser->parse(delta_message, delta_text, flushed_tokens); // Message can be modified inside parser, if parser for example extracted tool calling from message content } - msg["content"] = delta_text; + delta_message["content"] = delta_text; - // concatenate msg with m_parsed_message - concatenate_json_containers(msg, m_pimpl->m_parsed_message, {"reasoning_content", "content", "tool_calls"}); - - // We have to put into DeltaMessage's "content" fields only chunks that belong to content. - // But into m_parsed_message["content"] we need to accumulate full content if m_keep_original_content == True - // and if m_keep_original_content == False only part that is outside reasoning tags and outside tool calls. - - // fill in msg["content"] with delta_text - // The line below should be removed and it's function should be performed inside concatenate_json_containers - // m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + delta_text; - - - // TODO: on each for cycle iteration they receive an empty delta_message - // and pipe iterates though every parser. They should neither delete fields nor rewrite - // they should only append or add new fields. - // The only field is updated automaticall is "content" - // The remaining delta_text is put there. - // It's parsers responsibility to ensure fields are proper. - return write(msg); + JsonContainer::concatenate(m_pimpl->m_parsed_message, delta_message); + return write(delta_message); } JsonContainer TextParserStreamer::get_parsed_message() const { diff --git a/tests/cpp/parser.cpp b/tests/cpp/parser.cpp index e7262413b2..90abc7e64c 100644 --- a/tests/cpp/parser.cpp +++ b/tests/cpp/parser.cpp @@ -9,9 +9,9 @@ using namespace ov::genai; -namespace ov::genai { - void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector keys_to_concatenate); -} +// namespace ov::genai { +// void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector keys_to_concatenate); +// } TEST(ParserTest, test_llama3_parser_1) { std::string prompt = R"(What's the weather in New York today?<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n[get_weather(location="New York, NY", unit="celsius")]<|eom_id|>)"; @@ -100,7 +100,8 @@ TEST_F(DeepSeekR1ReasoningParserTest, ReasoningContentAccumulatesAcrossCalls) { for (int i = 1; i < input_stream.size(); i++) { std::string delta_text = input_stream[i]; delta_text = parser.parse(msg, delta_text); - concatenate_json_containers(msg, accumulated_msg, {"reasoning_content", "content"}); + // concatenate_json_containers(msg, accumulated_msg, {"reasoning_content", "content"}); + JsonContainer::concatenate(accumulated_msg, msg); } ASSERT_EQ(accumulated_msg["reasoning_content"], ref_res); } diff --git a/tests/python_tests/test_parsers.py b/tests/python_tests/test_parsers.py index 48483c39bd..7bb7c018dc 100644 --- a/tests/python_tests/test_parsers.py +++ b/tests/python_tests/test_parsers.py @@ -4,7 +4,7 @@ from utils.hugging_face import convert_and_save_tokenizer, download_and_convert_model from utils.ov_genai_pipelines import create_ov_pipeline import pytest -from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, Phi4ReasoningParser, ReasoningParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser +from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, ReasoningParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser from transformers import AutoTokenizer import re from io import StringIO @@ -40,23 +40,9 @@ def hf_ov_genai_models(request, tmp_path_factory): ["katuni4ka/tiny-random-phi3"], # this tokenizer is used as a stub only indirect=True ) -@pytest.mark.parametrize("answer", [ - "\nOkay, the user is asking for the answer to 2 + 1.\n\nThe answer to 2 + 1 is \boxed{3}.", - - ( - "\nOkay, the user is asking for the answer to 2 + 1. Let me make sure I understand " - "the question correctly. They want a short answer, so I shouldn't overcomplicate things. " - "Basic addition here. Two plus one equals three. Yeah, that's straightforward. I need to " - "respond with the answer inside a box using the specified format. Let me double-check the " - "arithmetic to avoid any mistakes. Yep, 2 + 1 is definitely 3. Alright, time to put it in " - "the box.\n\n\nThe answer to 2 + 1 is \boxed{3}." - ), -]) -def test_several_incremental_parsers(hf_ov_genai_models, answer): +def test_several_incremental_parsers(hf_ov_genai_models): hf_tokenizer, genai_tokenizer = hf_ov_genai_models - stream_string = re.split(r"(\s+)", answer) - class CustomReasonParser(IncrementalParser): thinking_started: bool = False deactivated: bool = False @@ -95,7 +81,7 @@ def parse(self, delta_msg: dict, delta_text: str, delta_tokens = None) -> str: self.started_took_call = False self.accumulated_tool_call.write(delta_text) self.deactivated = True - delta_msg['tool_calls'] = [json.loads(self.accumulated_tool_call.getvalue())] + delta_msg["tool_calls"] = [json.loads(self.accumulated_tool_call.getvalue())] # If not keep took call in resulting string # delta_text = '' elif self.started_took_call: @@ -111,7 +97,6 @@ def write(self, message): streamer = CustomStreamer(genai_tokenizer, parsers=[IncrementalToolParser(), CustomReasonParser()]) - msg = {} stream_string = ["Hello", "", " ", "world", " ", "", "!", "{", '"func_name": ', '"weather", ' '"location": "New York"', "}"] think_content = " world " # content = ''.join(stream_string).replace("", "").replace("", "") @@ -150,12 +135,12 @@ def test_incremental_phi4_reason_parser_1(hf_ov_genai_models, answer): stream_string = re.split(r"(\s+)", answer) # manually accumulate content from streamer - content = "" + content = StringIO() class CustomStreamer(TextParserStreamer): def write(self, message): nonlocal content - content += message['content'] + content.write(message["content"]) return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[Phi4ReasoningIncrementalParser()]) @@ -165,9 +150,9 @@ def write(self, message): think_content = answer.split("")[0].replace("", "") msg = streamer.get_parsed_message() - assert msg['reasoning_content'] == think_content - assert msg['content'] == answer - assert msg['content'].endswith(content) + assert msg["reasoning_content"] == think_content + assert msg["content"] == answer + assert msg["content"].endswith(content.getvalue()) @pytest.mark.parametrize( @@ -178,10 +163,10 @@ def write(self, message): def test_incremental_phi4_reason_integer_token_ids(hf_ov_genai_models): hf_tokenizer, genai_tokenizer = hf_ov_genai_models - accumulated_msg = {} + accumulated_message = {} class CustomStreamer(TextParserStreamer): - def write(self, message): - concatenate_dicts(accumulated_msg, message) + def write(self, delta_message): + concatenate_dicts(accumulated_message, delta_message) return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[Phi4ReasoningIncrementalParser()]) @@ -192,13 +177,12 @@ def write(self, message): streamer.end() think_content = answer.split("")[0].replace("", "") - content = answer msg = streamer.get_parsed_message() - assert msg['reasoning_content'] == think_content - assert msg['content'] == answer - assert accumulated_msg['reasoning_content'] == think_content - assert answer.endswith(accumulated_msg['content']) + assert msg["reasoning_content"] == think_content + assert msg["content"] == answer + assert accumulated_message["reasoning_content"] == think_content + assert answer.endswith(accumulated_message["content"]) @pytest.mark.parametrize( @@ -212,35 +196,30 @@ def test_incremental_integer_token_ids(hf_ov_genai_models): class CustomIncrementalParser(IncrementalParser): started_reasoning: bool = False - def parse(self, msg: dict, delta_text: str, delta_tokens = None) -> str: - if 'content' not in msg: - msg['content'] = '' - if 'reasoning_content' not in msg: - msg['reasoning_content'] = '' - + def parse(self, delta_message: dict, delta_text: str, delta_tokens = None) -> str: if 1 in delta_tokens and not self.started_reasoning: self.started_reasoning = True - msg['reasoning_content'] += delta_text + delta_message["reasoning_content"] = delta_text delta_text = '' elif 1 in delta_tokens and self.started_reasoning: self.started_reasoning = False delta_text = '' elif self.started_reasoning: - msg['reasoning_content'] += delta_text + delta_message["reasoning_content"] = delta_text delta_text = '' # # Here we are only collecting ordinary text, therefore leave delta_text unchanged. - msg['content'] += delta_text # will happen under the hood + delta_message["content"] = delta_text # will happen under the hood return delta_text - msg = {} + accumulated_message = {} class CustomStreamer(TextParserStreamer): - def write(self, message): - concatenate_dicts(msg, message) + def write(self, delta_message): + concatenate_dicts(accumulated_message, delta_message) return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[CustomIncrementalParser()]) - msg = {} + accumulated_message = {} # All closing tags , <|/inst|>, <|endoftext|>, etc. in tiny-random-phi3 add strange \x0c\x0c characters # so we avoid them in this test. answer = "\nOkay, the user is asking for the answer to 2 + 1.The answer to 2 + 1 is 3." @@ -250,8 +229,8 @@ def write(self, message): streamer._write([token]) streamer.end() - assert msg['reasoning_content'] == "\nOkay, the user is asking for the answer to 2 + 1" - assert msg['content'] == " The answer to 2 + 1 is 3." + assert accumulated_message["reasoning_content"] == "\nOkay, the user is asking for the answer to 2 + 1" + assert accumulated_message["content"] == " The answer to 2 + 1 is 3." @pytest.mark.parametrize( @@ -287,10 +266,10 @@ def write(self, message): msg = streamer.get_parsed_message() # breakpoint() - assert msg['reasoning_content'] == think_content - assert msg['content'].endswith(content) # since msg contains all accumulated content - assert msg_manual['reasoning_content'] == think_content - assert msg_manual['content'] == content + assert msg["reasoning_content"] == think_content + assert msg["content"].endswith(content) # since msg contains all accumulated content + assert msg_manual["reasoning_content"] == think_content + assert msg_manual["content"] == content @@ -303,15 +282,15 @@ def test_incremental_phi4_reason_parser_nostreamer(answer): stream_string = re.split(r"(\s+)", answer) - accumulated_msg = {} + accumulated_message = {} for subword in stream_string: - msg = {} # msg when the first parser is called should be empty - parser.parse(msg, subword) - concatenate_dicts(accumulated_msg, msg) + delta_message = {} # msg when the first parser is called should be empty + parser.parse(delta_message, subword) + concatenate_dicts(accumulated_message, delta_message) think_content = answer.split("")[0].replace("", "") - assert accumulated_msg['reasoning_content'] == think_content + assert accumulated_message["reasoning_content"] == think_content @pytest.mark.parametrize("keep_original_content", [True, False]) @@ -348,15 +327,14 @@ def write(self, message): if do_reset: # If has been reset, check that content is parsed correctly - assert msg['reasoning_content'] == think_content - assert msg['content'] == (answer if keep_original_content else "\n\nThe answer to 2 + 1 is \boxed{3}.") + assert msg["reasoning_content"] == think_content + assert msg["content"] == (answer if keep_original_content else "\n\nThe answer to 2 + 1 is \boxed{3}.") else: - # If has not been reset(), then content msg['content'] will continue to accumulate thinking parts from the next runs - assert msg['content'].find("") >= 0 + # If has not been reset(), then content msg["content"] will continue to accumulate thinking parts from the next runs + assert msg["content"].find("") >= 0 def test_incremental_deepseek_parser(): - msg = {} stream_string = [ "<|begin▁of▁sentence|>", "First", ",", " I", " recognize", " that", " the", " question", " is", " asking", " for", " the", " sum", " of", " ", "2", " and", " ", "1", ".\n\n", "I", " know", " that", " addition", @@ -370,16 +348,15 @@ def test_incremental_deepseek_parser(): full_str = ''.join(stream_string) think_content = full_str.split("")[0] - content = full_str.split("")[1] - msg = {} - accumulated_msg = {} + delta_message = {} + accumulated_message = {} parser = DeepSeekR1ReasoningIncrementalParser() for subword in stream_string: - parser.parse(msg, subword) - concatenate_dicts(accumulated_msg, msg) + parser.parse(delta_message, subword) + concatenate_dicts(accumulated_message, delta_message) - assert accumulated_msg['reasoning_content'] == think_content + assert accumulated_message["reasoning_content"] == think_content @pytest.mark.parametrize( @@ -393,26 +370,21 @@ def test_custom_incremental_parser(hf_ov_genai_models): class CustomParser(IncrementalParser): main_part_started: bool = False - def parse(self, msg: dict, delta_text: str, delta_tokens = None) -> str: - if 'content' not in msg: - msg['content'] = '' - if 'main_text' not in msg: - msg['main_text'] = '' - + def parse(self, delta_message: dict, delta_text: str, delta_tokens = None) -> str: if not self.main_part_started and delta_text == '': self.main_part_started = True elif self.main_part_started and delta_text == '': self.main_part_started = False else: if self.main_part_started: - msg['main_text'] += delta_text - msg['content'] += delta_text + delta_message["main_text"] = delta_text + delta_message["content"] = delta_text return delta_text - msg = {} + accumulated_message = {} class CustomStreamer(TextParserStreamer): - def write(self, message): - concatenate_dicts(msg, message) + def write(self, delta_message): + concatenate_dicts(accumulated_message, delta_message) return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[CustomParser()]) @@ -420,7 +392,7 @@ def write(self, message): for subword in stream_string: streamer._write(subword) - assert msg['main_text'] == " world " + assert accumulated_message["main_text"] == " world " @pytest.mark.parametrize( @@ -438,7 +410,7 @@ def test_final_parser_llama_32_json(hf_ov_genai_models): parser = Llama3JsonToolParser() parser.parse(content_json) - assert content_json['tool_calls'][0] == json.loads(json_str) + assert content_json["tool_calls"][0] == json.loads(json_str) @pytest.mark.parametrize("model_id", ["microsoft/Phi-4-mini-reasoning"]) @@ -446,13 +418,12 @@ def test_final_parser_llama_32_json(hf_ov_genai_models): def test_custom_parser(tmp_path, model_id): _, _, models_path = download_and_convert_model(model_id, padding_side="left") pipe = create_ov_pipeline(models_path) - tok = pipe.get_tokenizer() class CustomParser(Parser): def parse(self, msg: dict): content = None if 'content' in msg: - content = msg['content'] + content = msg["content"] if not content: return @@ -461,7 +432,7 @@ def parse(self, msg: dict): think_end = content.find("") if think_start != -1 and think_end != -1 and think_end > think_start: think_text = content[think_start + len(""):think_end].strip() - msg['reasoning_content'] = think_text + msg["reasoning_content"] = think_text parser = CustomParser() config = GenerationConfig() @@ -478,8 +449,8 @@ def parse(self, msg: dict): think_text = content[think_start + len(""):think_end].strip() assert 'reasoning_content' in res.parsed[0] - assert res.parsed[0]['reasoning_content'] != "" - assert res.parsed[0]['reasoning_content'] == think_text + assert res.parsed[0]["reasoning_content"] != "" + assert res.parsed[0]["reasoning_content"] == think_text @pytest.mark.parametrize("model_id", ["microsoft/Phi-4-mini-reasoning"]) @@ -505,8 +476,8 @@ def write(self, message): think_text = content[think_start + len(""):think_end] assert 'reasoning_content' in res.parsed[0] - assert res.parsed[0]['reasoning_content'] != "" - assert res.parsed[0]['reasoning_content'] == think_text + assert res.parsed[0]["reasoning_content"] != "" + assert res.parsed[0]["reasoning_content"] == think_text res_streamer_1 = pipe.generate([prompt], max_new_tokens=600, streamer=streamer) res_streamer_2 = pipe.generate([prompt], max_new_tokens=600, streamer=streamer) From 3a8e30238f902f8eb7ea321b30f92e8ee2c7094f Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Fri, 28 Nov 2025 00:36:54 +0100 Subject: [PATCH 5/8] Update src/cpp/src/parsers.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/cpp/src/parsers.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cpp/src/parsers.cpp b/src/cpp/src/parsers.cpp index 082520a2ca..15f944a781 100644 --- a/src/cpp/src/parsers.cpp +++ b/src/cpp/src/parsers.cpp @@ -176,8 +176,9 @@ class ReasoningIncrementalParser::ReasoningParserImpl { // Think tag was not opened yet and not found in the current delta_text. // Accumulate text in the cache to detect if is split between several delta_text pieces. m_text_cache += delta_text; + // Intentionally clear delta_text: no delta content is returned to the user during this phase + // (we are waiting for the tag to be fully detected in the cache). delta_text.clear(); - } return delta_text; } From ae2f6625ad1d52d895fd92f6f7c0bc3243e55697 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Fri, 28 Nov 2025 00:40:04 +0100 Subject: [PATCH 6/8] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/cpp/src/json_container.cpp | 2 +- src/cpp/src/parsers.cpp | 4 +++- tests/cpp/parser.cpp | 5 ----- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/cpp/src/json_container.cpp b/src/cpp/src/json_container.cpp index 797bff67bf..d1f951dcaf 100644 --- a/src/cpp/src/json_container.cpp +++ b/src/cpp/src/json_container.cpp @@ -421,7 +421,7 @@ void JsonContainer::concatenate(JsonContainer& dst, const JsonContainer& src) { continue; } - OPENVINO_ASSERT(src_val.is_string(), "JsonContainer concatenate supports only string concatenation for object values."); + OPENVINO_ASSERT(src_val.is_string(), "JsonContainer concatenate supports only string concatenation for object values."); auto& dst_val = (*dst_)[it.key()]; dst_val = dst_val.get() + src_val.get(); } diff --git a/src/cpp/src/parsers.cpp b/src/cpp/src/parsers.cpp index 15f944a781..00642ee55b 100644 --- a/src/cpp/src/parsers.cpp +++ b/src/cpp/src/parsers.cpp @@ -114,7 +114,9 @@ class ReasoningIncrementalParser::ReasoningParserImpl { // Keep potential partial close tag in cache m_text_cache = std::string(txt_chunk.substr(txt_chunk.size() - num_chars_to_keep)); reason_str = txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep); - delta_text = std::string(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep)); + if (m_keep_original_content) { + delta_text = std::string(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep)); + } } else { // No partial close tag, accumulate all text reason_str = txt_chunk; diff --git a/tests/cpp/parser.cpp b/tests/cpp/parser.cpp index 90abc7e64c..af4be108c5 100644 --- a/tests/cpp/parser.cpp +++ b/tests/cpp/parser.cpp @@ -9,10 +9,6 @@ using namespace ov::genai; -// namespace ov::genai { -// void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector keys_to_concatenate); -// } - TEST(ParserTest, test_llama3_parser_1) { std::string prompt = R"(What's the weather in New York today?<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n[get_weather(location="New York, NY", unit="celsius")]<|eom_id|>)"; // By default content should keep original values. @@ -100,7 +96,6 @@ TEST_F(DeepSeekR1ReasoningParserTest, ReasoningContentAccumulatesAcrossCalls) { for (int i = 1; i < input_stream.size(); i++) { std::string delta_text = input_stream[i]; delta_text = parser.parse(msg, delta_text); - // concatenate_json_containers(msg, accumulated_msg, {"reasoning_content", "content"}); JsonContainer::concatenate(accumulated_msg, msg); } ASSERT_EQ(accumulated_msg["reasoning_content"], ref_res); From a8764fe74a938ce9df322d11f4b11ad793edff4a Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Fri, 28 Nov 2025 00:45:17 +0100 Subject: [PATCH 7/8] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/cpp/src/parsers.cpp | 2 +- tests/python_tests/test_parsers.py | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/cpp/src/parsers.cpp b/src/cpp/src/parsers.cpp index 00642ee55b..9283347220 100644 --- a/src/cpp/src/parsers.cpp +++ b/src/cpp/src/parsers.cpp @@ -181,7 +181,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl { // Intentionally clear delta_text: no delta content is returned to the user during this phase // (we are waiting for the tag to be fully detected in the cache). delta_text.clear(); - + } return delta_text; } diff --git a/tests/python_tests/test_parsers.py b/tests/python_tests/test_parsers.py index 7bb7c018dc..c1c58bf665 100644 --- a/tests/python_tests/test_parsers.py +++ b/tests/python_tests/test_parsers.py @@ -4,14 +4,14 @@ from utils.hugging_face import convert_and_save_tokenizer, download_and_convert_model from utils.ov_genai_pipelines import create_ov_pipeline import pytest -from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, ReasoningParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser +from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser from transformers import AutoTokenizer import re from io import StringIO def concatenate_dicts(dst_dict, src_dict): - # keys that exist in both dictionaries + # keys that exist in both dictionaries keys = set(dst_dict.keys()).intersection(set(src_dict.keys())) for key in keys: dst_dict[key] += src_dict[key] @@ -219,7 +219,6 @@ def write(self, delta_message): return StreamingStatus.RUNNING streamer = CustomStreamer(genai_tokenizer, parsers=[CustomIncrementalParser()]) - accumulated_message = {} # All closing tags , <|/inst|>, <|endoftext|>, etc. in tiny-random-phi3 add strange \x0c\x0c characters # so we avoid them in this test. answer = "\nOkay, the user is asking for the answer to 2 + 1.The answer to 2 + 1 is 3." @@ -265,7 +264,6 @@ def write(self, message): content = ''.join(split_answer) msg = streamer.get_parsed_message() - # breakpoint() assert msg["reasoning_content"] == think_content assert msg["content"].endswith(content) # since msg contains all accumulated content assert msg_manual["reasoning_content"] == think_content From 48a0b140dd77157e35c3557da30242b2c8400a2f Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Fri, 28 Nov 2025 12:59:30 +0100 Subject: [PATCH 8/8] apply review comments --- src/cpp/src/json_container.cpp | 10 ++++++++-- src/cpp/src/text_streamer.cpp | 4 ++-- tests/python_tests/test_parsers.py | 4 ++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/cpp/src/json_container.cpp b/src/cpp/src/json_container.cpp index d1f951dcaf..3d027f321f 100644 --- a/src/cpp/src/json_container.cpp +++ b/src/cpp/src/json_container.cpp @@ -415,14 +415,20 @@ void JsonContainer::concatenate(JsonContainer& dst, const JsonContainer& src) { for (auto it = src_->begin(); it != src_->end(); ++it) { const auto& src_val = it.value(); - + // Check if both values are of string type only if need to concatenate them. + // Otherwise just write the source value to destination. Extra check is not needed. + if (!dst_->contains(it.key())) { (*dst_)[it.key()] = src_val; continue; } - OPENVINO_ASSERT(src_val.is_string(), "JsonContainer concatenate supports only string concatenation for object values."); auto& dst_val = (*dst_)[it.key()]; + OPENVINO_ASSERT( + src_val.is_string() && dst_val.is_string(), + "JsonContainer concatenate supports only string concatenation for object values. " + "Key: '", it.key(), "', src_val type: '", src_val.type_name(), "', dst_val type: '", dst_val.type_name(), "'." + ); dst_val = dst_val.get() + src_val.get(); } } diff --git a/src/cpp/src/text_streamer.cpp b/src/cpp/src/text_streamer.cpp index 98aa63cfbb..9edfb0ca6e 100644 --- a/src/cpp/src/text_streamer.cpp +++ b/src/cpp/src/text_streamer.cpp @@ -181,8 +181,8 @@ CallbackTypeVariant TextParserStreamer::write(std::string delta_text) { } // Every time we start to cycle through iterative parsers we create a new delta_message. - // Parsers should neither delete fields nor rewrite they should only append or add new fields. - // The only field is updated automaticall is "content": delta_text is put there. + // Parsers should neither delete fields nor rewrite; they should only append or add new fields. + // The only field that is updated automatically is "content": delta_text is put there. JsonContainer delta_message; // Iterate over all parsers and apply them to the message for (auto& parser: m_pimpl->m_parsers) { diff --git a/tests/python_tests/test_parsers.py b/tests/python_tests/test_parsers.py index 7498adaabb..6cf8d6aa28 100644 --- a/tests/python_tests/test_parsers.py +++ b/tests/python_tests/test_parsers.py @@ -4,7 +4,7 @@ from utils.hugging_face import convert_and_save_tokenizer, download_and_convert_model from utils.ov_genai_pipelines import create_ov_pipeline import pytest -from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser +from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, Phi4ReasoningParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser from transformers import AutoTokenizer import re from io import StringIO @@ -464,7 +464,7 @@ def write(self, message): streamer = CustomStreamer(tok, parsers=[Phi4ReasoningIncrementalParser()]) prompt = "Please say \"hello\"" - res = pipe.generate([prompt], max_new_tokens=600, parsers=[ReasoningParser(keep_original_content=False)]) + res = pipe.generate([prompt], max_new_tokens=600, parsers=[Phi4ReasoningParser()]) # extract manually reasoning content from the parsed result content = res.texts[0]