Skip to content

Commit 4771108

Browse files
Make it easier to do sync streaming (#8183)
1 parent 9f66570 commit 4771108

File tree

3 files changed

+50
-4
lines changed

3 files changed

+50
-4
lines changed

docs/docs/tutorials/streaming/index.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,3 +393,38 @@ Final output: Prediction(
393393
sum='9'
394394
)
395395
```
396+
397+
## Synchronous Streaming
398+
399+
By default calling a streamified DSPy program produces an async generator. In order to get back
400+
a sync generator, you can set the flag `async_streaming=False`:
401+
402+
403+
```python
404+
import os
405+
406+
import dspy
407+
408+
os.environ["OPENAI_API_KEY"] = "your_api_key"
409+
410+
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
411+
412+
predict = dspy.Predict("question->answer")
413+
414+
# Enable streaming for the 'answer' field
415+
stream_predict = dspy.streamify(
416+
predict,
417+
stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
418+
async_streaming=False,
419+
)
420+
421+
output = stream_predict(question="why did a chicken cross the kitchen?")
422+
423+
program_output = None
424+
for chunk in output:
425+
if isinstance(chunk, dspy.streaming.StreamResponse):
426+
print(chunk)
427+
elif isinstance(chunk, dspy.Prediction):
428+
program_output = chunk
429+
print(f"Program output: {program_output}")
430+
```

dspy/streaming/streamify.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def streamify(
3030
stream_listeners: Optional[List[StreamListener]] = None,
3131
include_final_prediction_in_output_stream: bool = True,
3232
is_async_program: bool = False,
33+
async_streaming: bool = True,
3334
) -> Callable[[Any, Any], Awaitable[Any]]:
3435
"""
3536
Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them
@@ -50,6 +51,8 @@ def streamify(
5051
still be included in the output stream even if this is `False`.
5152
is_async_program: Whether the program is async. If `False`, the program will be wrapped with `asyncify`,
5253
otherwise the program will be called with `acall`.
54+
async_streaming: Whether to return an async generator or a sync generator. If `False`, the streaming will be
55+
converted to a sync generator.
5356
5457
Returns:
5558
A function that takes the same arguments as the original program, but returns an async
@@ -169,7 +172,7 @@ async def generator(args, kwargs, stream: MemoryObjectSendStream):
169172

170173
await stream.send(prediction)
171174

172-
async def streamer(*args, **kwargs):
175+
async def async_streamer(*args, **kwargs):
173176
send_stream, receive_stream = create_memory_object_stream(16)
174177
async with create_task_group() as tg, send_stream, receive_stream:
175178
tg.start_soon(generator, args, kwargs, send_stream)
@@ -201,7 +204,15 @@ async def streamer(*args, **kwargs):
201204
yield value
202205
return
203206

204-
return streamer
207+
if async_streaming:
208+
return async_streamer
209+
else:
210+
211+
def sync_streamer(*args, **kwargs):
212+
output = async_streamer(*args, **kwargs)
213+
return apply_sync_streaming(output)
214+
215+
return sync_streamer
205216

206217

207218
def apply_sync_streaming(async_generator: AsyncGenerator) -> Generator:

tests/streaming/test_streaming.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,11 @@ def __call__(self, x: str, **kwargs):
249249
dspy.streaming.StreamListener(signature_field_name="judgement"),
250250
],
251251
include_final_prediction_in_output_stream=False,
252+
async_streaming=False,
252253
)
253254
output = program(x="why did a chicken cross the kitchen?")
254-
sync_output = dspy.streaming.apply_sync_streaming(output)
255255
all_chunks = []
256-
for value in sync_output:
256+
for value in output:
257257
if isinstance(value, dspy.streaming.StreamResponse):
258258
all_chunks.append(value)
259259

0 commit comments

Comments
 (0)