Skip to content

Commit 30ba8c8

Browse files
authored
fix(responses): sync conversation before yielding terminal events in streaming (#3888)
Move conversation sync logic before yield to ensure it executes even when streaming consumers break early after receiving response.completed event. ## Test Plan ``` OLLAMA_URL=http://localhost:11434 \ pytest -sv tests/integration/responses/ \ --stack-config server:ci-tests \ --text-model ollama/llama3.2:3b-instruct-fp16 \ --inference-mode live \ -k conversation_multi ``` This test now passes.
1 parent cb2185b commit 30ba8c8

File tree

3 files changed

+11
-4
lines changed

3 files changed

+11
-4
lines changed

llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,14 +372,13 @@ async def _create_streaming_response(
372372
final_response = stream_chunk.response
373373
elif stream_chunk.type == "response.failed":
374374
failed_response = stream_chunk.response
375-
yield stream_chunk
376375

377376
if stream_chunk.type == "response.output_item.done":
378377
item = stream_chunk.item
379378
output_items.append(item)
380379

381-
# Store and sync immediately after yielding terminal events
382-
# This ensures the storage/syncing happens even if the consumer breaks early
380+
# Store and sync before yielding terminal events
381+
# This ensures the storage/syncing happens even if the consumer breaks after receiving the event
383382
if (
384383
stream_chunk.type in {"response.completed", "response.incomplete"}
385384
and final_response
@@ -400,6 +399,8 @@ async def _create_streaming_response(
400399
await self._sync_response_to_conversation(conversation, input, output_items)
401400
await self.responses_store.store_conversation_messages(conversation, messages_to_store)
402401

402+
yield stream_chunk
403+
403404
async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject:
404405
return await self.responses_store.delete_response_object(response_id)
405406

tests/integration/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def pytest_sessionstart(session):
4343

4444
if "SQLITE_STORE_DIR" not in os.environ:
4545
os.environ["SQLITE_STORE_DIR"] = tempfile.mkdtemp()
46+
logger.info(f"Setting SQLITE_STORE_DIR: {os.environ['SQLITE_STORE_DIR']}")
4647

4748
# Set test stack config type for api_recorder test isolation
4849
stack_config = session.config.getoption("--stack-config", default=None)

tests/integration/fixtures/common.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ def is_port_available(port: int, host: str = "localhost") -> bool:
4040

4141
def start_llama_stack_server(config_name: str) -> subprocess.Popen:
4242
"""Start a llama stack server with the given config."""
43-
cmd = f"uv run llama stack run {config_name}"
43+
44+
# remove server.log if it exists
45+
if os.path.exists("server.log"):
46+
os.remove("server.log")
47+
48+
cmd = f"llama stack run {config_name}"
4449
devnull = open(os.devnull, "w")
4550
process = subprocess.Popen(
4651
shlex.split(cmd),

0 commit comments

Comments
 (0)