Skip to content

Streaming

Streaming support allows incremental processing of LLM outputs, providing real-time feedback to users.

Overview

All modules support streaming out of the box through the astream() method:

from udspy import Predict, OutputStreamChunk, Prediction

predictor = Predict("question -> answer")

async for event in predictor.astream(question="Explain AI"):
    if isinstance(event, OutputStreamChunk):
        print(event.delta, end="", flush=True)
    elif isinstance(event, Prediction):
        print(f"\n\nFinal result: {event.answer}")

Stream Events

The streaming API yields two types of events:

OutputStreamChunk

Incremental text updates for a specific field:

class OutputStreamChunk(StreamEvent):
    module: Module          # Module that generated this chunk
    field_name: str        # Which output field (e.g., "answer")
    delta: str             # New text since last chunk
    content: str           # Full accumulated text so far
    is_complete: bool      # Whether field is done streaming

Example:

async for event in predictor.astream(question="..."):
    if isinstance(event, OutputStreamChunk):
        print(f"[{event.field_name}] {event.delta}", end="", flush=True)
        if event.is_complete:
            print(f"\n--- {event.field_name} complete ---")

Prediction

Final result with all output fields:

class Prediction(StreamEvent, dict):
    module: Module | None   # Module that produced this prediction
    is_final: bool          # True for final result, False for intermediate
    # Dict with all output fields
    # Supports both dict and attribute access

The module and is_final attributes help distinguish predictions in nested module scenarios:

async for event in predictor.astream(question="..."):
    if isinstance(event, Prediction):
        # Access output fields
        print(f"Answer: {event.answer}")
        print(f"Same: {event['answer']}")

        # Check which module produced this
        module_name = event.module.__class__.__name__
        print(f"From: {module_name}")

        # Distinguish final vs intermediate predictions
        if event.is_final:
            print("This is the final result!")

Field-Specific Streaming

Streaming automatically handles multiple output fields:

from udspy import ChainOfThought

cot = ChainOfThought("question -> answer")

async for event in cot.astream(question="What is 157 * 234?"):
    if isinstance(event, OutputStreamChunk):
        if event.field_name == "reasoning":
            print(f"💭 {event.delta}", end="", flush=True)
        elif event.field_name == "answer":
            print(f"\n✓ {event.delta}", end="", flush=True)
    elif isinstance(event, Prediction):
        print(f"\n\nComplete!")

Custom Stream Events

You can emit custom events from tools or callbacks:

from dataclasses import dataclass
from udspy.streaming import StreamEvent, emit_event

@dataclass
class ToolProgress(StreamEvent):
    tool_name: str
    message: str
    progress: float  # 0.0 to 1.0

# In your tool:
from udspy import tool
from pydantic import Field

@tool(name="search")
async def search(query: str = Field(...)) -> str:
    emit_event(ToolProgress("search", "Starting search...", 0.0))

    results = await search_api(query)

    emit_event(ToolProgress("search", "Processing results...", 0.5))

    processed = process_results(results)

    emit_event(ToolProgress("search", "Complete!", 1.0))

    return processed

# In the stream consumer:
async for event in predictor.astream(question="..."):
    if isinstance(event, ToolProgress):
        print(f"[{event.tool_name}] {event.message} ({event.progress*100:.0f}%)")
    elif isinstance(event, OutputStreamChunk):
        print(event.delta, end="", flush=True)

Module Support

All built-in modules support streaming:

Predict

predictor = Predict("question -> answer")
async for event in predictor.astream(question="..."):
    ...

ChainOfThought

Streams both reasoning and answer:

cot = ChainOfThought("question -> answer")
async for event in cot.astream(question="..."):
    if isinstance(event, OutputStreamChunk):
        if event.field_name == "reasoning":
            # Reasoning streams first
            ...
        elif event.field_name == "answer":
            # Answer streams second
            ...

ReAct

Streams reasoning and tool interactions:

from udspy import ReAct

agent = ReAct("question -> answer", tools=[search])
async for event in agent.astream(question="..."):
    if isinstance(event, OutputStreamChunk):
        if event.field_name == "reasoning":
            print(f"💭 {event.delta}", end="", flush=True)
    elif isinstance(event, Prediction):
        print(f"\n\n✓ {event.answer}")

See examples/react_streaming.py for a complete example.

Nested Modules

When modules compose other modules (e.g., ReAct using ChainOfThought), predictions from both parent and child modules are streamed. Use the module and is_final attributes to distinguish them:

from udspy import ReAct, ChainOfThought

agent = ReAct("question -> answer", tools=[calculator])

async for event in agent.astream(question="What is 157 * 234?"):
    if isinstance(event, Prediction):
        module_name = event.module.__class__.__name__ if event.module else "Unknown"

        if event.is_final:
            # This is the final result from the top-level ReAct module
            print(f"Final answer from {module_name}: {event.answer}")
        else:
            # This is an intermediate prediction from a nested module
            # (e.g., ChainOfThought extraction step)
            print(f"Intermediate result from {module_name}")

See examples/stream_with_module_info.py for a complete example.

Implementation Details

Context Variables

Streaming uses Python's contextvars for thread-safe event queuing:

from udspy.streaming import _stream_queue, emit_event

# Internal: stream queue is set when aexecute(stream=True) is called
# emit_event() checks if a queue exists and puts events there

This allows tools and nested modules to emit events without explicit queue passing.

Event Flow

  1. Module's astream(**inputs) is called
  2. Queue is created and set in context
  3. Internal aexecute(stream=True) is called
  4. Module yields StreamChunk events as text arrives
  5. Tools can call emit_event() to inject custom events
  6. Module yields final Prediction when complete
  7. Queue is cleaned up

Non-Streaming Mode

For non-streaming execution, use aforward() instead of astream():

# Streaming: iterate over events
async for event in predictor.astream(question="..."):
    if isinstance(event, Prediction):
        result = event

# Non-streaming: get final result directly
result = await predictor.aforward(question="...")

Best Practices

1. Always Handle Both Event Types

async for event in module.astream(**inputs):
    match event:
        case OutputStreamChunk():
            # Handle streaming text
            print(event.delta, end="", flush=True)
        case Prediction():
            # Handle final result
            final_result = event

2. Check Field Names for Multi-field Outputs

async for event in module.astream(**inputs):
    if isinstance(event, OutputStreamChunk):
        if event.field_name == "reasoning":
            # Different formatting for reasoning
            print(f"💭 {event.delta}", end="")
        elif event.field_name == "answer":
            # Different formatting for answer
            print(f"✓ {event.delta}", end="")

3. Use Custom Events for Progress

@dataclass
class Progress(StreamEvent):
    step: str
    percent: float

async def long_running_tool():
    emit_event(Progress("Loading data", 0.3))
    data = load_data()

    emit_event(Progress("Processing", 0.6))
    result = process(data)

    emit_event(Progress("Complete", 1.0))
    return result

4. Accumulate Chunks for Display

accumulated = {}

async for event in module.astream(**inputs):
    if isinstance(event, OutputStreamChunk):
        field = event.field_name
        if field not in accumulated:
            accumulated[field] = ""
        accumulated[field] += event.delta

        # Update UI with accumulated[field]
        update_display(field, accumulated[field])

Performance Considerations

Latency

Streaming reduces perceived latency by showing results immediately:

  • Non-streaming: Wait for full response (~5s), then show all text
  • Streaming: Start showing text after ~500ms, continue as it arrives

Token Usage

Streaming doesn't affect token usage - same number of tokens are generated.

Error Handling

Errors can occur mid-stream:

try:
    async for event in module.astream(**inputs):
        if isinstance(event, OutputStreamChunk):
            print(event.delta, end="", flush=True)
except Exception as e:
    print(f"\n\nError during streaming: {e}")

See Also