Skip to content

API Reference: Streaming

udspy.streaming

Streaming support with event queue for incremental LLM outputs and tool updates.

Classes

OutputStreamChunk

Bases: StreamChunk

A chunk of streamed LLM output for a specific field.

Attributes:

Name Type Description
field_name str

Name of the output field

delta str

Incremental content for this field (new text since last chunk)

content str

Full accumulated content for this field so far

is_complete bool

Whether this field is finished streaming

Source code in src/udspy/streaming.py
class OutputStreamChunk(StreamChunk):
    """A chunk of streamed LLM output for a specific field.

    Attributes:
        field_name: Name of the output field
        delta: Incremental content for this field (new text since last chunk)
        content: Full accumulated content for this field so far
        is_complete: Whether this field is finished streaming
    """

    pass

Prediction

Bases: StreamEvent, dict[str, Any]

Final prediction result with attribute access.

This is both a StreamEvent (can be yielded from astream) and a dict (for convenient attribute access to outputs).

Attributes:

Name Type Description
module

The module that produced this prediction

native_tool_calls

Tool calls from native LLM response (if any)

Example
pred = Prediction(answer="Paris", reasoning="France's capital")
print(pred.answer)  # "Paris"
print(pred["answer"])  # "Paris"
print(pred.is_final)  # True for top-level result
print(pred.module)  # Module instance that produced this
Source code in src/udspy/streaming.py
class Prediction(StreamEvent, dict[str, Any]):
    """Final prediction result with attribute access.

    This is both a StreamEvent (can be yielded from astream) and a dict
    (for convenient attribute access to outputs).

    Attributes:
        module: The module that produced this prediction
        native_tool_calls: Tool calls from native LLM response (if any)

    Example:
        ```python
        pred = Prediction(answer="Paris", reasoning="France's capital")
        print(pred.answer)  # "Paris"
        print(pred["answer"])  # "Paris"
        print(pred.is_final)  # True for top-level result
        print(pred.module)  # Module instance that produced this
        ```
    """

    def __init__(
        self,
        /,
        module: "Module | None" = None,
        native_tool_calls: list["ToolCall"] | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(**kwargs)
        self.module = module
        self.native_tool_calls = native_tool_calls

    @property
    def is_final(self) -> bool:
        """Whether this is the final prediction (no pending tool calls)."""
        return bool(len(self.keys()) and not self.native_tool_calls)

    def __getattr__(self, name: str) -> Any:
        try:
            return self[name]
        except KeyError:
            raise AttributeError(f"Prediction has no attribute '{name}'") from None

    def __setattr__(self, name: str, value: Any) -> None:
        self[name] = value
Attributes
is_final property

Whether this is the final prediction (no pending tool calls).

StreamChunk

Bases: StreamEvent

A chunk of streamed output from a Module.

Source code in src/udspy/streaming.py
class StreamChunk(StreamEvent):
    """A chunk of streamed output from a Module."""

    module: "Module"
    field_name: str
    delta: str
    content: str
    is_complete: bool

    def __init__(
        self,
        module: "Module",
        field_name: str,
        delta: str,
        content: str,
        is_complete: bool,
    ) -> None:
        self.module = module
        self.field_name = field_name
        self.delta = delta
        self.content = content
        self.is_complete = is_complete

    def __repr__(self) -> str:
        status = "complete" if self.is_complete else "streaming"
        return (
            f"{self.__class__.__name__}(field={self.field_name}, "
            f"status={status}, delta={self.delta!r}, content={self.content!r})"
        )

StreamEvent

Base class for all stream events.

Users can define custom event types by inheriting from this class. The only built-in events are OutputStreamChunk and Prediction.

Example
from dataclasses import dataclass
from udspy.streaming import StreamEvent, emit_event

@dataclass
class ToolProgress(StreamEvent):
    tool_name: str
    message: str
    progress: float  # 0.0 to 1.0

# In your tool:
async def my_tool():
    emit_event(ToolProgress("search", "Searching...", 0.5))
Source code in src/udspy/streaming.py
class StreamEvent:
    """Base class for all stream events.

    Users can define custom event types by inheriting from this class.
    The only built-in events are OutputStreamChunk and Prediction.

    Example:
        ```python
        from dataclasses import dataclass
        from udspy.streaming import StreamEvent, emit_event

        @dataclass
        class ToolProgress(StreamEvent):
            tool_name: str
            message: str
            progress: float  # 0.0 to 1.0

        # In your tool:
        async def my_tool():
            emit_event(ToolProgress("search", "Searching...", 0.5))
        ```
    """

    pass

ThoughtStreamChunk

Bases: StreamChunk

A chunk of streamed reasoning output for a specific step.

Attributes:

Name Type Description
module Module

The module emitting this chunk

delta str

Incremental content for this step (new text since last chunk)

content str

Full accumulated content for this step so far

is_complete bool

Whether this step is finished streaming

Source code in src/udspy/streaming.py
class ThoughtStreamChunk(StreamChunk):
    """A chunk of streamed reasoning output for a specific step.

    Attributes:
        module: The module emitting this chunk

        delta: Incremental content for this step (new text since last chunk)
        content: Full accumulated content for this step so far
        is_complete: Whether this step is finished streaming
    """

    pass

Functions

emit_event(event)

Emit an event to the active stream.

This can be called from anywhere (tools, callbacks, etc.) to inject events into the current streaming context. If no stream is active, this is a no-op (silently ignored).

Parameters:

Name Type Description Default
event StreamEvent

The event to emit (any subclass of StreamEvent)

required
Example
from udspy.streaming import emit_event, StreamEvent
from dataclasses import dataclass

@dataclass
class ToolStatus(StreamEvent):
    message: str

async def my_tool():
    emit_event(ToolStatus("Starting search..."))
    result = await do_search()
    emit_event(ToolStatus("Search complete"))
    return result

# In the stream consumer:
async for event in predictor.astream(question="..."):
    if isinstance(event, ToolStatus):
        print(f"📊 {event.message}")
    elif isinstance(event, OutputStreamChunk):
        print(event.delta, end="", flush=True)
Source code in src/udspy/streaming.py
def emit_event(event: StreamEvent) -> None:
    """Emit an event to the active stream.

    This can be called from anywhere (tools, callbacks, etc.) to inject
    events into the current streaming context. If no stream is active,
    this is a no-op (silently ignored).

    Args:
        event: The event to emit (any subclass of StreamEvent)

    Example:
        ```python
        from udspy.streaming import emit_event, StreamEvent
        from dataclasses import dataclass

        @dataclass
        class ToolStatus(StreamEvent):
            message: str

        async def my_tool():
            emit_event(ToolStatus("Starting search..."))
            result = await do_search()
            emit_event(ToolStatus("Search complete"))
            return result

        # In the stream consumer:
        async for event in predictor.astream(question="..."):
            if isinstance(event, ToolStatus):
                print(f"📊 {event.message}")
            elif isinstance(event, OutputStreamChunk):
                print(event.delta, end="", flush=True)
        ```
    """
    queue = _stream_queue.get()
    if queue is not None:
        queue.put_nowait(event)