Skip to content

Streaming Examples

Learn how to use streaming for better user experience.

Basic Streaming

import asyncio
from udspy import StreamingPredict, Signature, InputField, OutputField

class QA(Signature):
    """Answer questions."""
    question: str = InputField()
    answer: str = OutputField()

async def main():
    predictor = StreamingPredict(QA)

    async for chunk in predictor.stream(question="What is AI?"):
        if isinstance(chunk, OutputStreamChunk):
            print(chunk.delta, end="", flush=True)

asyncio.run(main())

Multi-Field Streaming

Stream reasoning and answer separately:

class ReasonedQA(Signature):
    """Answer with reasoning."""
    question: str = InputField()
    reasoning: str = OutputField()
    answer: str = OutputField()

async def main():
    predictor = StreamingPredict(ReasonedQA)

    print("Question: What is the sum of first 10 primes?\n")

    async for item in predictor.stream(
        question="What is the sum of first 10 primes?"
    ):
        if isinstance(item, OutputStreamChunk):
            if item.field_name == "reasoning":
                print(f"💭 {item.delta}", end="", flush=True)
            elif item.field_name == "answer":
                print(f"\n✓ {item.delta}", end="", flush=True)

            if item.is_complete:
                print()  # Newline after field completes

        elif isinstance(item, Prediction):
            print(f"\n\nFinal: {item.answer}")

asyncio.run(main())

Web Application Integration

Use streaming in a web application:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/ask")
async def ask_question(question: str):
    async def generate():
        predictor = StreamingPredict(QA)
        async for chunk in predictor.stream(question=question):
            if isinstance(chunk, OutputStreamChunk) and not chunk.is_complete:
                # chunk.delta contains the new incremental text
                # chunk.content contains the full accumulated text so far
                yield f"data: {chunk.delta}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

See the full example in the repository.