Streaming
Streaming support allows incremental processing of LLM outputs, providing real-time feedback to users.
Overview
All modules support streaming out of the box through the astream() method:
from udspy import Predict, OutputStreamChunk, Prediction
predictor = Predict("question -> answer")
async for event in predictor.astream(question="Explain AI"):
if isinstance(event, OutputStreamChunk):
print(event.delta, end="", flush=True)
elif isinstance(event, Prediction):
print(f"\n\nFinal result: {event.answer}")
Stream Events
The streaming API yields two types of events:
OutputStreamChunk
Incremental text updates for a specific field:
class OutputStreamChunk(StreamEvent):
module: Module # Module that generated this chunk
field_name: str # Which output field (e.g., "answer")
delta: str # New text since last chunk
content: str # Full accumulated text so far
is_complete: bool # Whether field is done streaming
Example:
async for event in predictor.astream(question="..."):
if isinstance(event, OutputStreamChunk):
print(f"[{event.field_name}] {event.delta}", end="", flush=True)
if event.is_complete:
print(f"\n--- {event.field_name} complete ---")
Prediction
Final result with all output fields:
class Prediction(StreamEvent, dict):
module: Module | None # Module that produced this prediction
is_final: bool # True for final result, False for intermediate
# Dict with all output fields
# Supports both dict and attribute access
The module and is_final attributes help distinguish predictions in nested module scenarios:
async for event in predictor.astream(question="..."):
if isinstance(event, Prediction):
# Access output fields
print(f"Answer: {event.answer}")
print(f"Same: {event['answer']}")
# Check which module produced this
module_name = event.module.__class__.__name__
print(f"From: {module_name}")
# Distinguish final vs intermediate predictions
if event.is_final:
print("This is the final result!")
Field-Specific Streaming
Streaming automatically handles multiple output fields:
from udspy import ChainOfThought
cot = ChainOfThought("question -> answer")
async for event in cot.astream(question="What is 157 * 234?"):
if isinstance(event, OutputStreamChunk):
if event.field_name == "reasoning":
print(f"💠{event.delta}", end="", flush=True)
elif event.field_name == "answer":
print(f"\n✓ {event.delta}", end="", flush=True)
elif isinstance(event, Prediction):
print(f"\n\nComplete!")
Custom Stream Events
You can emit custom events from tools or callbacks:
from dataclasses import dataclass
from udspy.streaming import StreamEvent, emit_event
@dataclass
class ToolProgress(StreamEvent):
tool_name: str
message: str
progress: float # 0.0 to 1.0
# In your tool:
from udspy import tool
from pydantic import Field
@tool(name="search")
async def search(query: str = Field(...)) -> str:
emit_event(ToolProgress("search", "Starting search...", 0.0))
results = await search_api(query)
emit_event(ToolProgress("search", "Processing results...", 0.5))
processed = process_results(results)
emit_event(ToolProgress("search", "Complete!", 1.0))
return processed
# In the stream consumer:
async for event in predictor.astream(question="..."):
if isinstance(event, ToolProgress):
print(f"[{event.tool_name}] {event.message} ({event.progress*100:.0f}%)")
elif isinstance(event, OutputStreamChunk):
print(event.delta, end="", flush=True)
Module Support
All built-in modules support streaming:
Predict
ChainOfThought
Streams both reasoning and answer:
cot = ChainOfThought("question -> answer")
async for event in cot.astream(question="..."):
if isinstance(event, OutputStreamChunk):
if event.field_name == "reasoning":
# Reasoning streams first
...
elif event.field_name == "answer":
# Answer streams second
...
ReAct
Streams reasoning and tool interactions:
from udspy import ReAct
agent = ReAct("question -> answer", tools=[search])
async for event in agent.astream(question="..."):
if isinstance(event, OutputStreamChunk):
if event.field_name == "reasoning":
print(f"💠{event.delta}", end="", flush=True)
elif isinstance(event, Prediction):
print(f"\n\n✓ {event.answer}")
See examples/react_streaming.py for a complete example.
Nested Modules
When modules compose other modules (e.g., ReAct using ChainOfThought), predictions from both parent and child modules are streamed. Use the module and is_final attributes to distinguish them:
from udspy import ReAct, ChainOfThought
agent = ReAct("question -> answer", tools=[calculator])
async for event in agent.astream(question="What is 157 * 234?"):
if isinstance(event, Prediction):
module_name = event.module.__class__.__name__ if event.module else "Unknown"
if event.is_final:
# This is the final result from the top-level ReAct module
print(f"Final answer from {module_name}: {event.answer}")
else:
# This is an intermediate prediction from a nested module
# (e.g., ChainOfThought extraction step)
print(f"Intermediate result from {module_name}")
See examples/stream_with_module_info.py for a complete example.
Implementation Details
Context Variables
Streaming uses Python's contextvars for thread-safe event queuing:
from udspy.streaming import _stream_queue, emit_event
# Internal: stream queue is set when aexecute(stream=True) is called
# emit_event() checks if a queue exists and puts events there
This allows tools and nested modules to emit events without explicit queue passing.
Event Flow
- Module's
astream(**inputs)is called - Queue is created and set in context
- Internal
aexecute(stream=True)is called - Module yields
StreamChunkevents as text arrives - Tools can call
emit_event()to inject custom events - Module yields final
Predictionwhen complete - Queue is cleaned up
Non-Streaming Mode
For non-streaming execution, use aforward() instead of astream():
# Streaming: iterate over events
async for event in predictor.astream(question="..."):
if isinstance(event, Prediction):
result = event
# Non-streaming: get final result directly
result = await predictor.aforward(question="...")
Best Practices
1. Always Handle Both Event Types
async for event in module.astream(**inputs):
match event:
case OutputStreamChunk():
# Handle streaming text
print(event.delta, end="", flush=True)
case Prediction():
# Handle final result
final_result = event
2. Check Field Names for Multi-field Outputs
async for event in module.astream(**inputs):
if isinstance(event, OutputStreamChunk):
if event.field_name == "reasoning":
# Different formatting for reasoning
print(f"💠{event.delta}", end="")
elif event.field_name == "answer":
# Different formatting for answer
print(f"✓ {event.delta}", end="")
3. Use Custom Events for Progress
@dataclass
class Progress(StreamEvent):
step: str
percent: float
async def long_running_tool():
emit_event(Progress("Loading data", 0.3))
data = load_data()
emit_event(Progress("Processing", 0.6))
result = process(data)
emit_event(Progress("Complete", 1.0))
return result
4. Accumulate Chunks for Display
accumulated = {}
async for event in module.astream(**inputs):
if isinstance(event, OutputStreamChunk):
field = event.field_name
if field not in accumulated:
accumulated[field] = ""
accumulated[field] += event.delta
# Update UI with accumulated[field]
update_display(field, accumulated[field])
Performance Considerations
Latency
Streaming reduces perceived latency by showing results immediately:
- Non-streaming: Wait for full response (~5s), then show all text
- Streaming: Start showing text after ~500ms, continue as it arrives
Token Usage
Streaming doesn't affect token usage - same number of tokens are generated.
Error Handling
Errors can occur mid-stream:
try:
async for event in module.astream(**inputs):
if isinstance(event, OutputStreamChunk):
print(event.delta, end="", flush=True)
except Exception as e:
print(f"\n\nError during streaming: {e}")
See Also
- API: Streaming - Full API reference
- Examples: Streaming - Complete examples
- Examples: ReAct Streaming - Agent streaming
- Base Module - Module execution patterns