Skip to content

API Reference: Modules

udspy.module

Module package for composable LLM calls.

Classes

ChainOfThought

Bases: Module

Chain of Thought reasoning module.

Automatically adds a reasoning step before generating outputs. This encourages the LLM to think step-by-step, improving answer quality.

Example
class QA(Signature):
    '''Answer questions.'''
    question: str = InputField()
    answer: str = OutputField()

# Creates predictor with automatic reasoning
predictor = ChainOfThought(QA)
result = predictor(question="What is 2+2?")

print(result.reasoning)  # "Let's think step by step..."
print(result.answer)     # "4"
Source code in src/udspy/module/chain_of_thought.py
class ChainOfThought(Module):
    """Chain of Thought reasoning module.

    Automatically adds a reasoning step before generating outputs.
    This encourages the LLM to think step-by-step, improving answer quality.

    Example:
        ```python
        class QA(Signature):
            '''Answer questions.'''
            question: str = InputField()
            answer: str = OutputField()

        # Creates predictor with automatic reasoning
        predictor = ChainOfThought(QA)
        result = predictor(question="What is 2+2?")

        print(result.reasoning)  # "Let's think step by step..."
        print(result.answer)     # "4"
        ```
    """

    def __init__(
        self,
        signature: type[Signature] | str,
        *,
        reasoning_description: str = "Step-by-step reasoning process",
        tools: list[Tool] | None = None,
        model: str | None = None,
        adapter: ChatAdapter | None = None,
        **kwargs: Any,
    ):
        """Initialize a Chain of Thought module.

        Args:
            signature: Signature defining inputs and final outputs, or a string in
                      format "inputs -> outputs" (e.g., "question -> answer")
            reasoning_description: Description for the reasoning field
            model: Model name (overrides global default)
            tools: List of Pydantic tool models
            adapter: Custom adapter
            **kwargs: Additional arguments for chat completion (including callbacks)
        """
        if isinstance(signature, str):
            signature = Signature.from_string(signature)

        self.original_signature = signature
        self.reasoning_description = reasoning_description
        self.model = model
        self.adapter = adapter
        self.kwargs = kwargs

        # Initialize module with tools
        self.init_module(tools=tools)

    def init_module(self, tools: list[Any] | None = None) -> None:
        """Initialize or reinitialize ChainOfThought with new tools.

        Args:
            tools: New tools to initialize with
        """
        extended_signature = self._build_extended_signature()
        self._create_predictor(extended_signature, tools)

    def _build_extended_signature(self) -> type[Signature]:
        """Build extended signature with reasoning field.

        Returns:
            Signature with reasoning field prepended to outputs
        """
        signature = self.original_signature

        input_fields = {
            name: field.annotation for name, field in signature.get_input_fields().items()
        }
        output_fields = {
            name: field.annotation for name, field in signature.get_output_fields().items()
        }

        extended_outputs = {"reasoning": str, **output_fields}

        extended_signature = make_signature(
            input_fields,  # type: ignore[arg-type]
            extended_outputs,  # type: ignore[arg-type]
            signature.get_instructions(),
        )

        extended_signature.model_fields["reasoning"].description = self.reasoning_description

        return extended_signature

    def _create_predictor(self, signature: type[Signature], tools: list[Any] | None) -> None:
        """Create the internal Predict module.

        Args:
            signature: Extended signature with reasoning field
            tools: Tools to pass to Predict
        """
        self.predict = Predict(
            signature, tools=tools, model=self.model, adapter=self.adapter, **self.kwargs
        )

    @with_callbacks
    async def aexecute(self, *, stream: bool = False, **inputs: Any) -> Prediction:
        """Execute chain of thought prediction.

        Delegates to the wrapped Predict module's aexecute method, which will
        automatically emit streaming events if a queue is active.

        Args:
            stream: If True, request streaming from LLM provider
            **inputs: Input values matching the signature's input fields

        Returns:
            Prediction with reasoning and other output fields
        """
        return await self.predict.aexecute(stream=stream, **inputs)
Functions
__init__(signature, *, reasoning_description='Step-by-step reasoning process', tools=None, model=None, adapter=None, **kwargs)

Initialize a Chain of Thought module.

Parameters:

Name Type Description Default
signature type[Signature] | str

Signature defining inputs and final outputs, or a string in format "inputs -> outputs" (e.g., "question -> answer")

required
reasoning_description str

Description for the reasoning field

'Step-by-step reasoning process'
model str | None

Model name (overrides global default)

None
tools list[Tool] | None

List of Pydantic tool models

None
adapter ChatAdapter | None

Custom adapter

None
**kwargs Any

Additional arguments for chat completion (including callbacks)

{}
Source code in src/udspy/module/chain_of_thought.py
def __init__(
    self,
    signature: type[Signature] | str,
    *,
    reasoning_description: str = "Step-by-step reasoning process",
    tools: list[Tool] | None = None,
    model: str | None = None,
    adapter: ChatAdapter | None = None,
    **kwargs: Any,
):
    """Initialize a Chain of Thought module.

    Args:
        signature: Signature defining inputs and final outputs, or a string in
                  format "inputs -> outputs" (e.g., "question -> answer")
        reasoning_description: Description for the reasoning field
        model: Model name (overrides global default)
        tools: List of Pydantic tool models
        adapter: Custom adapter
        **kwargs: Additional arguments for chat completion (including callbacks)
    """
    if isinstance(signature, str):
        signature = Signature.from_string(signature)

    self.original_signature = signature
    self.reasoning_description = reasoning_description
    self.model = model
    self.adapter = adapter
    self.kwargs = kwargs

    # Initialize module with tools
    self.init_module(tools=tools)
aexecute(*, stream=False, **inputs) async

Execute chain of thought prediction.

Delegates to the wrapped Predict module's aexecute method, which will automatically emit streaming events if a queue is active.

Parameters:

Name Type Description Default
stream bool

If True, request streaming from LLM provider

False
**inputs Any

Input values matching the signature's input fields

{}

Returns:

Type Description
Prediction

Prediction with reasoning and other output fields

Source code in src/udspy/module/chain_of_thought.py
@with_callbacks
async def aexecute(self, *, stream: bool = False, **inputs: Any) -> Prediction:
    """Execute chain of thought prediction.

    Delegates to the wrapped Predict module's aexecute method, which will
    automatically emit streaming events if a queue is active.

    Args:
        stream: If True, request streaming from LLM provider
        **inputs: Input values matching the signature's input fields

    Returns:
        Prediction with reasoning and other output fields
    """
    return await self.predict.aexecute(stream=stream, **inputs)
init_module(tools=None)

Initialize or reinitialize ChainOfThought with new tools.

Parameters:

Name Type Description Default
tools list[Any] | None

New tools to initialize with

None
Source code in src/udspy/module/chain_of_thought.py
def init_module(self, tools: list[Any] | None = None) -> None:
    """Initialize or reinitialize ChainOfThought with new tools.

    Args:
        tools: New tools to initialize with
    """
    extended_signature = self._build_extended_signature()
    self._create_predictor(extended_signature, tools)

ConfirmationRequired

Bases: Exception

Raised when human input is needed to proceed.

This exception pauses execution and allows modules to save state for resumption. It can be raised by: - Tools decorated with @confirm_first - Modules that need user input (e.g., ask_to_user) - Custom code requiring human interaction

Attributes:

Name Type Description
question

The question being asked to the user

confirmation_id

Unique ID for this confirmation request

tool_call

Optional ToolCall information if raised by a tool

context

General-purpose context dictionary for module state

Source code in src/udspy/confirmation.py
class ConfirmationRequired(Exception):
    """Raised when human input is needed to proceed.

    This exception pauses execution and allows modules to save state for resumption.
    It can be raised by:
    - Tools decorated with @confirm_first
    - Modules that need user input (e.g., ask_to_user)
    - Custom code requiring human interaction

    Attributes:
        question: The question being asked to the user
        confirmation_id: Unique ID for this confirmation request
        tool_call: Optional ToolCall information if raised by a tool
        context: General-purpose context dictionary for module state
    """

    def __init__(
        self,
        question: str,
        *,
        confirmation_id: str | None = None,
        tool_call: Optional["ToolCall"] = None,
        context: dict[str, Any] | None = None,
    ):
        """Initialize ConfirmationRequired exception.

        Args:
            question: Question to ask the user
            confirmation_id: Unique ID for this confirmation (auto-generated if not provided)
            tool_call: Optional tool call information
            context: Optional context dictionary for module-specific state
        """
        super().__init__(question)
        self.question = question
        self.confirmation_id = confirmation_id or str(uuid.uuid4())
        self.tool_call = tool_call
        self.context = context or {}
Functions
__init__(question, *, confirmation_id=None, tool_call=None, context=None)

Initialize ConfirmationRequired exception.

Parameters:

Name Type Description Default
question str

Question to ask the user

required
confirmation_id str | None

Unique ID for this confirmation (auto-generated if not provided)

None
tool_call Optional[ToolCall]

Optional tool call information

None
context dict[str, Any] | None

Optional context dictionary for module-specific state

None
Source code in src/udspy/confirmation.py
def __init__(
    self,
    question: str,
    *,
    confirmation_id: str | None = None,
    tool_call: Optional["ToolCall"] = None,
    context: dict[str, Any] | None = None,
):
    """Initialize ConfirmationRequired exception.

    Args:
        question: Question to ask the user
        confirmation_id: Unique ID for this confirmation (auto-generated if not provided)
        tool_call: Optional tool call information
        context: Optional context dictionary for module-specific state
    """
    super().__init__(question)
    self.question = question
    self.confirmation_id = confirmation_id or str(uuid.uuid4())
    self.tool_call = tool_call
    self.context = context or {}

Module

Base class for all udspy modules.

Modules are composable async-first units. The core method is aexecute() which handles both streaming and non-streaming execution. Public methods astream() and aforward() are thin wrappers around aexecute().

Subclasses should implement aexecute() to define their behavior.

Example
# Async streaming (real-time)
async for event in module.astream(question="What is AI?"):
    if isinstance(event, OutputStreamChunk):
        print(event.delta, end="", flush=True)
    elif isinstance(event, Prediction):
        result = event

# Async non-streaming
result = await module.aforward(question="What is AI?")

# Sync (for scripts, notebooks)
result = module(question="What is AI?")
result = module.forward(question="What is AI?")
Source code in src/udspy/module/base.py
class Module:
    """Base class for all udspy modules.

    Modules are composable async-first units. The core method is `aexecute()`
    which handles both streaming and non-streaming execution. Public methods
    `astream()` and `aforward()` are thin wrappers around `aexecute()`.

    Subclasses should implement `aexecute()` to define their behavior.

    Example:
        ```python
        # Async streaming (real-time)
        async for event in module.astream(question="What is AI?"):
            if isinstance(event, OutputStreamChunk):
                print(event.delta, end="", flush=True)
            elif isinstance(event, Prediction):
                result = event

        # Async non-streaming
        result = await module.aforward(question="What is AI?")

        # Sync (for scripts, notebooks)
        result = module(question="What is AI?")
        result = module.forward(question="What is AI?")
        ```
    """

    @with_callbacks
    async def aexecute(
        self, *, stream: bool = False, history: History | None = None, **inputs: Any
    ) -> Prediction:
        """Core execution method. Must be implemented by subclasses.

        This is the single implementation point for both streaming and non-streaming
        execution. It always returns a Prediction, and optionally emits StreamEvent
        objects to the active queue (if one exists in the context).

        Args:
            stream: If True, request streaming from LLM provider. If False, use
                non-streaming API calls.
            history: Optional History object for maintaining conversation state
            **inputs: Input values for the module

        Returns:
            Final Prediction object

        Behavior:
            - Checks for active stream queue via _stream_queue.get()
            - If queue exists: emits OutputStreamChunk and Prediction events
            - Always returns final Prediction (even in streaming mode)
            - This enables composability: nested modules emit events automatically

        Raises:
            NotImplementedError: If not implemented by subclass
        """
        raise NotImplementedError(f"{self.__class__.__name__} must implement aexecute() method")

    @abstractmethod
    def init_module(self, tools: list[Callable[..., Any]] | None = None) -> None:
        """Initialize or reinitialize the module with new tools.

        This method provides a way to completely reinitialize module state,
        including tools, tool schemas, and signatures. It's designed to be
        called from module callbacks that need to dynamically modify the
        module during execution.

        When implementing this method, subclasses should:
        1. Rebuild the tools dictionary
        2. Regenerate tool schemas (if applicable)
        3. Rebuild signatures with new tool descriptions (if applicable)
        4. Preserve built-in tools (if applicable)

        Args:
            tools: New tools to initialize with. Format depends on subclass:
                - Can be functions (will be wrapped in Tool)
                - Can be Tool instances
                - None means clear all non-built-in tools

        Example:
            ```python
            from udspy import module_callback

            @module_callback
            def add_tools(context):
                # Get current tools
                current = list(context.module.tools.values())

                # Add new tools
                new_tools = [weather_tool, calendar_tool]

                # Reinitialize module with all tools
                context.module.init_module(tools=current + new_tools)

                return "Added weather and calendar tools"
            ```

        Note:
            This method is typically called from within a module callback
            decorated with @module_callback. The callback receives a context
            object with access to the module instance.
        """
        raise NotImplementedError(f"{self.__class__.__name__} must implement init_module() method")

    async def astream(
        self, *, resume_state: Any = None, history: History | None = None, **inputs: Any
    ) -> AsyncGenerator[StreamEvent]:
        """Async streaming method. Sets up queue and yields events.

        This method sets up the stream queue context, calls aexecute() with
        streaming enabled, and yields all events from the queue.

        Supports resuming from a ConfirmationRequired exception by providing
        resume_state. This enables streaming with confirmation handling.

        Args:
            resume_state: Optional ResumeState containing exception and user response.
                Can also be a raw ConfirmationRequired exception (will use "yes" as response).
            history: Optional History object for maintaining conversation state.
            **inputs: Input values for the module

        Yields:
            StreamEvent objects (OutputStreamChunk, Prediction, and custom events)
        """

        queue: asyncio.Queue[StreamEvent] = asyncio.Queue()
        token = _stream_queue.set(queue)

        try:
            task = asyncio.create_task(
                self.aexecute(stream=True, resume_state=resume_state, history=history, **inputs)
            )

            while True:
                # Prioritize consuming events - check queue FIRST
                try:
                    event = await asyncio.wait_for(queue.get(), timeout=0.01)
                    yield event
                    continue  # Skip task.done() check, keep consuming
                except TimeoutError:
                    pass  # Queue empty, proceed to check task status

                # Only check task completion when queue is idle
                if task.done():
                    try:
                        await task
                    except Exception:
                        raise

                    # Drain any remaining events
                    while not queue.empty():
                        event = queue.get_nowait()
                        yield event
                    break

        finally:
            try:
                _stream_queue.reset(token)
            except (ValueError, LookupError):
                pass

    async def aforward(
        self, *, resume_state: Any = None, history: History | None = None, **inputs: Any
    ) -> Prediction:
        """Async non-streaming method. Returns final result directly.

        This method calls aexecute() with streaming disabled. If called from
        within a streaming context (i.e., another module is streaming), events
        will still be emitted to the active queue.

        Supports resuming from a ConfirmationRequired exception by providing
        resume_state. This enables loop-based confirmation handling.

        Args:
            resume_state: Optional ResumeState containing exception and user response.
                Can also be a raw ConfirmationRequired exception (will use "yes" as response).
            history: Optional History object for maintaining conversation state.
            **inputs: Input values for the module

        Returns:
            Final Prediction object

        Example:
            ```python
            from udspy import ResumeState

            # Loop-based confirmation handling
            resume_state = None

            while True:
                try:
                    result = await agent.aforward(
                        question="Delete files",
                        resume_state=resume_state
                    )
                    break
                except ConfirmationRequired as e:
                    user_response = input(f"{e.question} (yes/no): ")
                    resume_state = ResumeState(e, user_response)
            ```
        """
        return await self.aexecute(
            stream=False, resume_state=resume_state, history=history, **inputs
        )

    def forward(
        self, *, resume_state: Any = None, history: History | None = None, **inputs: Any
    ) -> Prediction:
        """Sync non-streaming method. Wraps aforward() with async_to_sync.

        This provides sync compatibility for scripts and notebooks. Cannot be
        called from within an async context (use aforward() instead).

        Supports resuming from a ConfirmationRequired exception by providing
        resume_state. This enables loop-based confirmation handling.

        Args:
            resume_state: Optional ResumeState containing exception and user response.
                Can also be a raw ConfirmationRequired exception (will use "yes" as response).
            history: Optional History object for maintaining conversation state.
            **inputs: Input values for the module (includes both input fields
                and any module-specific parameters like auto_execute_tools)

        Returns:
            Final Prediction object

        Raises:
            RuntimeError: If called from within an async context

        Example:
            ```python
            from udspy import ResumeState

            # Loop-based confirmation handling
            resume_state = None

            while True:
                try:
                    result = agent.forward(
                        question="Delete files",
                        resume_state=resume_state
                    )
                    break
                except ConfirmationRequired as e:
                    user_response = input(f"{e.question} (yes/no): ")
                    resume_state = ResumeState(e, user_response)
            ```
        """
        ensure_sync_context(f"{self.__class__.__name__}.forward")

        return run_async_with_context(
            self.aforward(resume_state=resume_state, history=history, **inputs)
        )

    def __call__(
        self, *, resume_state: Any = None, history: History | None = None, **inputs: Any
    ) -> Prediction:
        """Sync convenience method. Calls forward().

        Supports resuming from a ConfirmationRequired exception by providing
        resume_state. This enables loop-based confirmation handling.

        Args:
            resume_state: Optional ResumeState containing exception and user response.
                Can also be a raw ConfirmationRequired exception (will use "yes" as response).
            history: Optional History object for maintaining conversation state.
            **inputs: Input values for the module

        Returns:
            Final Prediction object

        Example:
            ```python
            from udspy import ResumeState

            # Loop-based confirmation handling
            resume_state = None

            while True:
                try:
                    result = agent(
                        question="Delete files",
                        resume_state=resume_state
                    )
                    break
                except ConfirmationRequired as e:
                    user_response = input(f"{e.question} (yes/no): ")
                    resume_state = ResumeState(e, user_response)
            ```
        """
        return self.forward(resume_state=resume_state, history=history, **inputs)

    async def asuspend(self, exception: ConfirmationRequired) -> Any:
        """Async suspend execution and save state.

        Called when ConfirmationRequired is raised. Subclasses should override
        to save any module-specific state needed for resumption.

        Args:
            exception: The ConfirmationRequired exception that was raised

        Returns:
            Saved state (can be any type, will be passed to aresume)
        """
        # Default implementation returns the exception itself as state
        return exception

    def suspend(self, exception: ConfirmationRequired) -> Any:
        """Sync suspend execution and save state.

        Wraps asuspend() with async_to_sync.

        Args:
            exception: The ConfirmationRequired exception that was raised

        Returns:
            Saved state (can be any type, will be passed to resume)
        """
        ensure_sync_context(f"{self.__class__.__name__}.suspend")
        return run_async_with_context(self.asuspend(exception))

    async def aresume(self, user_response: str, saved_state: Any) -> Prediction:
        """Async resume execution after user input.

        Called to resume execution after a ConfirmationRequired exception.
        Subclasses must override to implement resumption logic.

        Args:
            user_response: The user's response. Can be:
                - "yes"/"y" to approve the action
                - "no"/"n" to reject the action
                - "feedback" to provide feedback for LLM re-reasoning
                - JSON string with "edit" to modify tool arguments
            saved_state: State returned from asuspend()

        Returns:
            Final Prediction object

        Raises:
            NotImplementedError: If not implemented by subclass
        """
        raise NotImplementedError(f"{self.__class__.__name__} must implement aresume() method")

    def resume(self, user_response: str, saved_state: Any) -> Prediction:
        """Sync resume execution after user input.

        Wraps aresume() with async_to_sync.

        Args:
            user_response: The user's response
            saved_state: State returned from suspend()

        Returns:
            Final Prediction object
        """
        ensure_sync_context(f"{self.__class__.__name__}.resume")
        return run_async_with_context(self.aresume(user_response, saved_state))
Functions
__call__(*, resume_state=None, history=None, **inputs)

Sync convenience method. Calls forward().

Supports resuming from a ConfirmationRequired exception by providing resume_state. This enables loop-based confirmation handling.

Parameters:

Name Type Description Default
resume_state Any

Optional ResumeState containing exception and user response. Can also be a raw ConfirmationRequired exception (will use "yes" as response).

None
history History | None

Optional History object for maintaining conversation state.

None
**inputs Any

Input values for the module

{}

Returns:

Type Description
Prediction

Final Prediction object

Example
from udspy import ResumeState

# Loop-based confirmation handling
resume_state = None

while True:
    try:
        result = agent(
            question="Delete files",
            resume_state=resume_state
        )
        break
    except ConfirmationRequired as e:
        user_response = input(f"{e.question} (yes/no): ")
        resume_state = ResumeState(e, user_response)
Source code in src/udspy/module/base.py
def __call__(
    self, *, resume_state: Any = None, history: History | None = None, **inputs: Any
) -> Prediction:
    """Sync convenience method. Calls forward().

    Supports resuming from a ConfirmationRequired exception by providing
    resume_state. This enables loop-based confirmation handling.

    Args:
        resume_state: Optional ResumeState containing exception and user response.
            Can also be a raw ConfirmationRequired exception (will use "yes" as response).
        history: Optional History object for maintaining conversation state.
        **inputs: Input values for the module

    Returns:
        Final Prediction object

    Example:
        ```python
        from udspy import ResumeState

        # Loop-based confirmation handling
        resume_state = None

        while True:
            try:
                result = agent(
                    question="Delete files",
                    resume_state=resume_state
                )
                break
            except ConfirmationRequired as e:
                user_response = input(f"{e.question} (yes/no): ")
                resume_state = ResumeState(e, user_response)
        ```
    """
    return self.forward(resume_state=resume_state, history=history, **inputs)
aexecute(*, stream=False, history=None, **inputs) async

Core execution method. Must be implemented by subclasses.

This is the single implementation point for both streaming and non-streaming execution. It always returns a Prediction, and optionally emits StreamEvent objects to the active queue (if one exists in the context).

Parameters:

Name Type Description Default
stream bool

If True, request streaming from LLM provider. If False, use non-streaming API calls.

False
history History | None

Optional History object for maintaining conversation state

None
**inputs Any

Input values for the module

{}

Returns:

Type Description
Prediction

Final Prediction object

Behavior
  • Checks for active stream queue via _stream_queue.get()
  • If queue exists: emits OutputStreamChunk and Prediction events
  • Always returns final Prediction (even in streaming mode)
  • This enables composability: nested modules emit events automatically

Raises:

Type Description
NotImplementedError

If not implemented by subclass

Source code in src/udspy/module/base.py
@with_callbacks
async def aexecute(
    self, *, stream: bool = False, history: History | None = None, **inputs: Any
) -> Prediction:
    """Core execution method. Must be implemented by subclasses.

    This is the single implementation point for both streaming and non-streaming
    execution. It always returns a Prediction, and optionally emits StreamEvent
    objects to the active queue (if one exists in the context).

    Args:
        stream: If True, request streaming from LLM provider. If False, use
            non-streaming API calls.
        history: Optional History object for maintaining conversation state
        **inputs: Input values for the module

    Returns:
        Final Prediction object

    Behavior:
        - Checks for active stream queue via _stream_queue.get()
        - If queue exists: emits OutputStreamChunk and Prediction events
        - Always returns final Prediction (even in streaming mode)
        - This enables composability: nested modules emit events automatically

    Raises:
        NotImplementedError: If not implemented by subclass
    """
    raise NotImplementedError(f"{self.__class__.__name__} must implement aexecute() method")
aforward(*, resume_state=None, history=None, **inputs) async

Async non-streaming method. Returns final result directly.

This method calls aexecute() with streaming disabled. If called from within a streaming context (i.e., another module is streaming), events will still be emitted to the active queue.

Supports resuming from a ConfirmationRequired exception by providing resume_state. This enables loop-based confirmation handling.

Parameters:

Name Type Description Default
resume_state Any

Optional ResumeState containing exception and user response. Can also be a raw ConfirmationRequired exception (will use "yes" as response).

None
history History | None

Optional History object for maintaining conversation state.

None
**inputs Any

Input values for the module

{}

Returns:

Type Description
Prediction

Final Prediction object

Example
from udspy import ResumeState

# Loop-based confirmation handling
resume_state = None

while True:
    try:
        result = await agent.aforward(
            question="Delete files",
            resume_state=resume_state
        )
        break
    except ConfirmationRequired as e:
        user_response = input(f"{e.question} (yes/no): ")
        resume_state = ResumeState(e, user_response)
Source code in src/udspy/module/base.py
async def aforward(
    self, *, resume_state: Any = None, history: History | None = None, **inputs: Any
) -> Prediction:
    """Async non-streaming method. Returns final result directly.

    This method calls aexecute() with streaming disabled. If called from
    within a streaming context (i.e., another module is streaming), events
    will still be emitted to the active queue.

    Supports resuming from a ConfirmationRequired exception by providing
    resume_state. This enables loop-based confirmation handling.

    Args:
        resume_state: Optional ResumeState containing exception and user response.
            Can also be a raw ConfirmationRequired exception (will use "yes" as response).
        history: Optional History object for maintaining conversation state.
        **inputs: Input values for the module

    Returns:
        Final Prediction object

    Example:
        ```python
        from udspy import ResumeState

        # Loop-based confirmation handling
        resume_state = None

        while True:
            try:
                result = await agent.aforward(
                    question="Delete files",
                    resume_state=resume_state
                )
                break
            except ConfirmationRequired as e:
                user_response = input(f"{e.question} (yes/no): ")
                resume_state = ResumeState(e, user_response)
        ```
    """
    return await self.aexecute(
        stream=False, resume_state=resume_state, history=history, **inputs
    )
aresume(user_response, saved_state) async

Async resume execution after user input.

Called to resume execution after a ConfirmationRequired exception. Subclasses must override to implement resumption logic.

Parameters:

Name Type Description Default
user_response str

The user's response. Can be: - "yes"/"y" to approve the action - "no"/"n" to reject the action - "feedback" to provide feedback for LLM re-reasoning - JSON string with "edit" to modify tool arguments

required
saved_state Any

State returned from asuspend()

required

Returns:

Type Description
Prediction

Final Prediction object

Raises:

Type Description
NotImplementedError

If not implemented by subclass

Source code in src/udspy/module/base.py
async def aresume(self, user_response: str, saved_state: Any) -> Prediction:
    """Async resume execution after user input.

    Called to resume execution after a ConfirmationRequired exception.
    Subclasses must override to implement resumption logic.

    Args:
        user_response: The user's response. Can be:
            - "yes"/"y" to approve the action
            - "no"/"n" to reject the action
            - "feedback" to provide feedback for LLM re-reasoning
            - JSON string with "edit" to modify tool arguments
        saved_state: State returned from asuspend()

    Returns:
        Final Prediction object

    Raises:
        NotImplementedError: If not implemented by subclass
    """
    raise NotImplementedError(f"{self.__class__.__name__} must implement aresume() method")
astream(*, resume_state=None, history=None, **inputs) async

Async streaming method. Sets up queue and yields events.

This method sets up the stream queue context, calls aexecute() with streaming enabled, and yields all events from the queue.

Supports resuming from a ConfirmationRequired exception by providing resume_state. This enables streaming with confirmation handling.

Parameters:

Name Type Description Default
resume_state Any

Optional ResumeState containing exception and user response. Can also be a raw ConfirmationRequired exception (will use "yes" as response).

None
history History | None

Optional History object for maintaining conversation state.

None
**inputs Any

Input values for the module

{}

Yields:

Type Description
AsyncGenerator[StreamEvent]

StreamEvent objects (OutputStreamChunk, Prediction, and custom events)

Source code in src/udspy/module/base.py
async def astream(
    self, *, resume_state: Any = None, history: History | None = None, **inputs: Any
) -> AsyncGenerator[StreamEvent]:
    """Async streaming method. Sets up queue and yields events.

    This method sets up the stream queue context, calls aexecute() with
    streaming enabled, and yields all events from the queue.

    Supports resuming from a ConfirmationRequired exception by providing
    resume_state. This enables streaming with confirmation handling.

    Args:
        resume_state: Optional ResumeState containing exception and user response.
            Can also be a raw ConfirmationRequired exception (will use "yes" as response).
        history: Optional History object for maintaining conversation state.
        **inputs: Input values for the module

    Yields:
        StreamEvent objects (OutputStreamChunk, Prediction, and custom events)
    """

    queue: asyncio.Queue[StreamEvent] = asyncio.Queue()
    token = _stream_queue.set(queue)

    try:
        task = asyncio.create_task(
            self.aexecute(stream=True, resume_state=resume_state, history=history, **inputs)
        )

        while True:
            # Prioritize consuming events - check queue FIRST
            try:
                event = await asyncio.wait_for(queue.get(), timeout=0.01)
                yield event
                continue  # Skip task.done() check, keep consuming
            except TimeoutError:
                pass  # Queue empty, proceed to check task status

            # Only check task completion when queue is idle
            if task.done():
                try:
                    await task
                except Exception:
                    raise

                # Drain any remaining events
                while not queue.empty():
                    event = queue.get_nowait()
                    yield event
                break

    finally:
        try:
            _stream_queue.reset(token)
        except (ValueError, LookupError):
            pass
asuspend(exception) async

Async suspend execution and save state.

Called when ConfirmationRequired is raised. Subclasses should override to save any module-specific state needed for resumption.

Parameters:

Name Type Description Default
exception ConfirmationRequired

The ConfirmationRequired exception that was raised

required

Returns:

Type Description
Any

Saved state (can be any type, will be passed to aresume)

Source code in src/udspy/module/base.py
async def asuspend(self, exception: ConfirmationRequired) -> Any:
    """Async suspend execution and save state.

    Called when ConfirmationRequired is raised. Subclasses should override
    to save any module-specific state needed for resumption.

    Args:
        exception: The ConfirmationRequired exception that was raised

    Returns:
        Saved state (can be any type, will be passed to aresume)
    """
    # Default implementation returns the exception itself as state
    return exception
forward(*, resume_state=None, history=None, **inputs)

Sync non-streaming method. Wraps aforward() with async_to_sync.

This provides sync compatibility for scripts and notebooks. Cannot be called from within an async context (use aforward() instead).

Supports resuming from a ConfirmationRequired exception by providing resume_state. This enables loop-based confirmation handling.

Parameters:

Name Type Description Default
resume_state Any

Optional ResumeState containing exception and user response. Can also be a raw ConfirmationRequired exception (will use "yes" as response).

None
history History | None

Optional History object for maintaining conversation state.

None
**inputs Any

Input values for the module (includes both input fields and any module-specific parameters like auto_execute_tools)

{}

Returns:

Type Description
Prediction

Final Prediction object

Raises:

Type Description
RuntimeError

If called from within an async context

Example
from udspy import ResumeState

# Loop-based confirmation handling
resume_state = None

while True:
    try:
        result = agent.forward(
            question="Delete files",
            resume_state=resume_state
        )
        break
    except ConfirmationRequired as e:
        user_response = input(f"{e.question} (yes/no): ")
        resume_state = ResumeState(e, user_response)
Source code in src/udspy/module/base.py
def forward(
    self, *, resume_state: Any = None, history: History | None = None, **inputs: Any
) -> Prediction:
    """Sync non-streaming method. Wraps aforward() with async_to_sync.

    This provides sync compatibility for scripts and notebooks. Cannot be
    called from within an async context (use aforward() instead).

    Supports resuming from a ConfirmationRequired exception by providing
    resume_state. This enables loop-based confirmation handling.

    Args:
        resume_state: Optional ResumeState containing exception and user response.
            Can also be a raw ConfirmationRequired exception (will use "yes" as response).
        history: Optional History object for maintaining conversation state.
        **inputs: Input values for the module (includes both input fields
            and any module-specific parameters like auto_execute_tools)

    Returns:
        Final Prediction object

    Raises:
        RuntimeError: If called from within an async context

    Example:
        ```python
        from udspy import ResumeState

        # Loop-based confirmation handling
        resume_state = None

        while True:
            try:
                result = agent.forward(
                    question="Delete files",
                    resume_state=resume_state
                )
                break
            except ConfirmationRequired as e:
                user_response = input(f"{e.question} (yes/no): ")
                resume_state = ResumeState(e, user_response)
        ```
    """
    ensure_sync_context(f"{self.__class__.__name__}.forward")

    return run_async_with_context(
        self.aforward(resume_state=resume_state, history=history, **inputs)
    )
init_module(tools=None) abstractmethod

Initialize or reinitialize the module with new tools.

This method provides a way to completely reinitialize module state, including tools, tool schemas, and signatures. It's designed to be called from module callbacks that need to dynamically modify the module during execution.

When implementing this method, subclasses should: 1. Rebuild the tools dictionary 2. Regenerate tool schemas (if applicable) 3. Rebuild signatures with new tool descriptions (if applicable) 4. Preserve built-in tools (if applicable)

Parameters:

Name Type Description Default
tools list[Callable[..., Any]] | None

New tools to initialize with. Format depends on subclass: - Can be functions (will be wrapped in Tool) - Can be Tool instances - None means clear all non-built-in tools

None
Example
from udspy import module_callback

@module_callback
def add_tools(context):
    # Get current tools
    current = list(context.module.tools.values())

    # Add new tools
    new_tools = [weather_tool, calendar_tool]

    # Reinitialize module with all tools
    context.module.init_module(tools=current + new_tools)

    return "Added weather and calendar tools"
Note

This method is typically called from within a module callback decorated with @module_callback. The callback receives a context object with access to the module instance.

Source code in src/udspy/module/base.py
@abstractmethod
def init_module(self, tools: list[Callable[..., Any]] | None = None) -> None:
    """Initialize or reinitialize the module with new tools.

    This method provides a way to completely reinitialize module state,
    including tools, tool schemas, and signatures. It's designed to be
    called from module callbacks that need to dynamically modify the
    module during execution.

    When implementing this method, subclasses should:
    1. Rebuild the tools dictionary
    2. Regenerate tool schemas (if applicable)
    3. Rebuild signatures with new tool descriptions (if applicable)
    4. Preserve built-in tools (if applicable)

    Args:
        tools: New tools to initialize with. Format depends on subclass:
            - Can be functions (will be wrapped in Tool)
            - Can be Tool instances
            - None means clear all non-built-in tools

    Example:
        ```python
        from udspy import module_callback

        @module_callback
        def add_tools(context):
            # Get current tools
            current = list(context.module.tools.values())

            # Add new tools
            new_tools = [weather_tool, calendar_tool]

            # Reinitialize module with all tools
            context.module.init_module(tools=current + new_tools)

            return "Added weather and calendar tools"
        ```

    Note:
        This method is typically called from within a module callback
        decorated with @module_callback. The callback receives a context
        object with access to the module instance.
    """
    raise NotImplementedError(f"{self.__class__.__name__} must implement init_module() method")
resume(user_response, saved_state)

Sync resume execution after user input.

Wraps aresume() with async_to_sync.

Parameters:

Name Type Description Default
user_response str

The user's response

required
saved_state Any

State returned from suspend()

required

Returns:

Type Description
Prediction

Final Prediction object

Source code in src/udspy/module/base.py
def resume(self, user_response: str, saved_state: Any) -> Prediction:
    """Sync resume execution after user input.

    Wraps aresume() with async_to_sync.

    Args:
        user_response: The user's response
        saved_state: State returned from suspend()

    Returns:
        Final Prediction object
    """
    ensure_sync_context(f"{self.__class__.__name__}.resume")
    return run_async_with_context(self.aresume(user_response, saved_state))
suspend(exception)

Sync suspend execution and save state.

Wraps asuspend() with async_to_sync.

Parameters:

Name Type Description Default
exception ConfirmationRequired

The ConfirmationRequired exception that was raised

required

Returns:

Type Description
Any

Saved state (can be any type, will be passed to resume)

Source code in src/udspy/module/base.py
def suspend(self, exception: ConfirmationRequired) -> Any:
    """Sync suspend execution and save state.

    Wraps asuspend() with async_to_sync.

    Args:
        exception: The ConfirmationRequired exception that was raised

    Returns:
        Saved state (can be any type, will be passed to resume)
    """
    ensure_sync_context(f"{self.__class__.__name__}.suspend")
    return run_async_with_context(self.asuspend(exception))

Predict

Bases: Module

Module for making LLM predictions based on a signature.

This is an async-first module. The core method is astream() which yields StreamEvent objects. Use aforward() for async non-streaming, or forward() for sync usage.

Example
from udspy import Predict, Signature, InputField, OutputField

class QA(Signature):
    '''Answer questions.'''
    question: str = InputField()
    answer: str = OutputField()

predictor = Predict(QA)

# Sync usage
result = predictor(question="What is 2+2?")
print(result.answer)

# Async non-streaming
result = await predictor.aforward(question="What is 2+2?")

# Async streaming
from udspy.streaming import OutputStreamChunk
async for event in predictor.astream(question="What is 2+2?"):
    if isinstance(event, OutputStreamChunk):
        print(event.delta, end="", flush=True)
Source code in src/udspy/module/predict.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
class Predict(Module):
    """Module for making LLM predictions based on a signature.

    This is an async-first module. The core method is `astream()` which yields
    StreamEvent objects. Use `aforward()` for async non-streaming, or `forward()`
    for sync usage.

    Example:
        ```python
        from udspy import Predict, Signature, InputField, OutputField

        class QA(Signature):
            '''Answer questions.'''
            question: str = InputField()
            answer: str = OutputField()

        predictor = Predict(QA)

        # Sync usage
        result = predictor(question="What is 2+2?")
        print(result.answer)

        # Async non-streaming
        result = await predictor.aforward(question="What is 2+2?")

        # Async streaming
        from udspy.streaming import OutputStreamChunk
        async for event in predictor.astream(question="What is 2+2?"):
            if isinstance(event, OutputStreamChunk):
                print(event.delta, end="", flush=True)
        ```
    """

    def __init__(
        self,
        signature: type[Signature] | str,
        *,
        tools: list[Tool] | None = None,
        max_turns: int = 10,
        adapter: ChatAdapter | None = None,
        model: str | None = None,
        **kwargs: Any,
    ):
        """Initialize a Predict module.

        Args:
            signature: Signature defining inputs and outputs, or a string in
                      format "inputs -> outputs" (e.g., "question -> answer")
            model: Model name (overrides global default)
            tools: List of tool functions (decorated with @tool) or Pydantic models
            max_turns: Maximum number of LLM calls for tool execution loop (default: 10)
            adapter: Custom adapter (defaults to ChatAdapter)
            **kwargs: Additional arguments for chat completion (temperature, callbacks, etc.)
        """
        if isinstance(signature, str):
            signature = Signature.from_string(signature)

        self.signature = signature
        self._model = model
        self._kwargs = kwargs
        self.max_turns = max_turns
        if max_turns < 1:
            raise ValueError("max_turns must be at least 1")
        self.adapter = adapter or ChatAdapter()

        self.init_module(tools=tools)

    @property
    def model(self) -> str | None:
        """Get the model name override, or None to use LM's default."""
        return self._model

    @property
    def kwargs(self) -> dict[str, Any]:
        return {**settings.default_kwargs, **self._kwargs}

    def init_module(self, tools: list[Any] | None = None) -> None:
        """Initialize or reinitialize Predict with new tools.

        This method rebuilds the tools dictionary and regenerates tool schemas.
        It's designed to be called from module callbacks to dynamically modify
        available tools during execution.

        Args:
            tools: New tools to initialize with. Can be:
                - Functions decorated with @tool
                - Tool instances
                - None to clear all tools

        Example:
            ```python
            from udspy import module_callback

            @module_callback
            def add_specialized_tools(context):
                # Get current tools
                current_tools = list(context.module.tools.values())

                # Add new tools
                new_tools = [weather_tool, calendar_tool]

                # Reinitialize with all tools
                context.module.init_module(tools=current_tools + new_tools)

                return "Added weather and calendar tools"
            ```
        """
        self._init_tools(tools or [])

    def _init_tools(self, tools: list[Any]) -> None:
        """Initialize tools dictionary with provided tools.

        Args:
            tools: List of tools (functions or Tool instances)
        """
        tool_list = [t if isinstance(t, Tool) else Tool(t) for t in tools]
        self.tools = {tool.name: tool for tool in tool_list if tool.name}
        self._build_tool_schemas()

    def _build_tool_schemas(self) -> None:
        """Build OpenAI tool schemas from current tools."""
        self.tool_schemas = [self.adapter.format_tool_schema(tool) for tool in self.tools.values()]

    @suspendable
    @with_callbacks
    async def aexecute(
        self,
        *,
        stream: bool = False,
        auto_execute_tools: bool = True,
        history: History | None = None,
        **inputs: Any,
    ) -> Prediction:
        """Core execution method - handles both streaming and non-streaming.

        This is the single implementation point for LLM interaction. It always
        returns a Prediction, and emits events to the queue if one is active.

        Args:
            stream: If True, request streaming from OpenAI. If False, use regular API.
            auto_execute_tools: If True, automatically execute tools and continue.
                If False, return Prediction with tool_calls for manual handling.
            history: Optional History object for multi-turn conversations.
            **inputs: Input values matching the signature's input fields

        Returns:
            Final Prediction object (after all tool executions if auto_execute_tools=True)
        """
        if history is None:
            history = History()

        self._validate_inputs(inputs)
        self._build_initial_messages(inputs, history)

        return await self._aexecute(
            stream=stream,
            auto_execute_tools=auto_execute_tools,
            history=history,
        )

    def _validate_inputs(self, inputs: dict[str, Any]) -> None:
        """Validate that all required inputs are provided."""
        input_fields = self.signature.get_input_fields()
        for field_name in input_fields:
            if field_name not in inputs:
                raise ValueError(f"Missing required input field: {field_name}")

    def _build_initial_messages(self, inputs: dict[str, Any], history: History) -> None:
        """Build initial messages from inputs and optional history.

        Args:
            inputs: Input values from user
            history: History object with existing conversation
        """
        history.set_system_message(self.adapter.format_instructions(self.signature))
        history.add_user_message(self.adapter.format_user_request(self.signature, inputs))

    async def _aexecute(
        self,
        stream: bool,
        auto_execute_tools: bool,
        history: History,
    ) -> Prediction:
        """Execute multi-turn conversation with optional automatic tool execution.

        This is the core execution loop that handles both streaming and non-streaming.

        Args:
            stream: If True, request streaming from OpenAI
            auto_execute_tools: If True, automatically execute tools. If False,
                return after first tool call.
            history: Optional History object to update with conversation

        Returns:
            Final Prediction object
        """
        prediction: Prediction | None = None

        for turn in range(self.max_turns):
            prediction = await self._aexecute_one_turn(history.messages, turn, stream=stream)

            if not auto_execute_tools or not prediction.native_tool_calls:
                break

            await self._aexecute_tool_calls(prediction.native_tool_calls, history)
        else:
            if prediction is not None and not prediction.is_final:
                raise RuntimeError(f"Max turns ({self.max_turns}) reached without final answer")

        if prediction is None:
            raise RuntimeError("No prediction generated")

        self._update_history_with_prediction(history, prediction)
        return prediction

    async def _aexecute_one_turn(
        self, messages: list[dict[str, Any]], turn: int, stream: bool
    ) -> Prediction:
        """Execute one LLM turn (streaming or non-streaming).

        Args:
            messages: Conversation messages
            turn: Current turn number (0-indexed)
            stream: If True, request streaming from OpenAI

        Returns:
            Prediction object for this turn
        """
        completion_kwargs: dict[str, Any] = {
            "messages": messages,
            "stream": stream,
            "tools": self.tool_schemas,
            **self.kwargs,
        }

        # Only pass model if explicitly set (otherwise LM uses its default)
        if self.model is not None:
            completion_kwargs["model"] = self.model

        func = self._astream if stream else self._aforward
        return await func(completion_kwargs)

    async def _aexecute_tool_calls(
        self,
        native_tool_calls: list[ToolCall],
        history: History,
    ) -> None:
        """Execute tool calls and add results to messages.

        Args:
            tool_calls: List of tool calls to execute
            history: History object to update
        """
        history.add_assistant_message(
            tool_calls=[
                {
                    "id": tc.call_id,
                    "type": "function",
                    "function": {"name": tc.name, "arguments": json.dumps(tc.args)},
                }
                for tc in native_tool_calls
            ]
        )

        for tool_call in native_tool_calls:
            call_id = tool_call.call_id
            tool_name = tool_call.name
            tool_args = tool_call.args

            content: str = ""
            if tool_name in self.tools:
                try:
                    result = await self.tools[tool_name](**tool_args)

                    if is_module_callback(result):
                        context = PredictContext(module=self, history=history)
                        content = result(context)
                    elif isinstance(result, BaseModel):
                        content = result.model_dump_json()
                    elif not isinstance(result, str):
                        content = json.dumps(result)
                    else:
                        content = result
                except Exception as e:
                    content = f"Error executing tool: {e}"
            else:
                content = f"Error: Tool `{tool_name}` not found."
                available_tools = ", ".join(f"`{tool}`" for tool in self.tools.keys())
                if available_tools:
                    content += f" Available tools are: {available_tools}."
                else:
                    content += " No tools are currently available."

            history.add_tool_result(str(call_id), content)

    def _update_history_with_prediction(self, history: History, prediction: Prediction) -> None:
        """Update history with assistant's prediction.

        Args:
            history: History object to update
            prediction: Prediction from assistant
        """
        output_fields = self.signature.get_output_fields()
        content_parts = []

        for field_name in output_fields:
            if hasattr(prediction, field_name):
                value = getattr(prediction, field_name)
                if value:
                    content_parts.append(f"[[ ## {field_name} ## ]]\n{value}")

        content = "\n".join(content_parts) if content_parts else ""
        history.add_assistant_message(content)

    @retry(
        retry=retry_if_exception_type(AdapterParseError),
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=0.1, max=3),
    )
    async def _aforward(self, completion_kwargs: dict[str, Any]) -> Prediction:
        """Process non-streaming LLM call with automatic retry on parse errors.

        Retries up to 2 times (3 total attempts) with exponential backoff (0.1-3s)
        when AdapterParseError occurs, giving the LLM multiple chances to format
        the response correctly.

        Args:
            completion_kwargs: Arguments for the completion API call

        Returns:
            Prediction object
        """

        response = await settings.lm.acomplete(**completion_kwargs)

        message = response.choices[0].message  # type: ignore[union-attr]
        native_tool_calls: list[ToolCall] = []
        for tc in message.tool_calls or []:
            try:
                arguments = (
                    json.loads(tc.function.arguments)  # type: ignore[union-attr]
                    if isinstance(tc.function.arguments, str)  # type: ignore[union-attr]
                    else tc.function.arguments  # type: ignore[union-attr]
                )
            except json.JSONDecodeError as exc:
                raise AdapterParseError(
                    adapter_name=self.adapter.__class__.__name__,
                    signature=self.signature,
                    lm_response=tc.function.arguments,  # type: ignore[union-attr]
                    parsed_result={
                        "error": f"Failed to parse tool call {tc.id} arguments as JSON."
                    },
                ) from exc

            else:
                native_tool_calls.append(
                    ToolCall(call_id=tc.id, name=tc.function.name, args=arguments)  # type: ignore[union-attr]
                )

        _, completion_text = self.adapter.split_reasoning_and_content_delta(response)  # type: ignore[arg-type]
        outputs = self.adapter.parse_outputs(self.signature, completion_text)

        self.adapter.validate_outputs(self.signature, outputs, native_tool_calls, completion_text)

        prediction = Prediction(module=self, native_tool_calls=native_tool_calls, **outputs)
        emit_event(prediction)  # If a stream is active, emit the final prediction

        return prediction

    @retry(
        retry=retry_if_exception_type(AdapterParseError),
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=0.1, max=3),
    )
    async def _astream(self, completion_kwargs: dict[str, Any]) -> Prediction:
        """Process streaming LLM call with automatic retry on parse errors.

        Retries up to 2 times (3 total attempts) with exponential backoff (0.1-3s)
        when AdapterParseError occurs, giving the LLM multiple chances to format
        the response correctly.

        Args:
            completion_kwargs: Arguments for the completion API call

        Returns:
            Prediction object
        """

        try:
            # Reset parser for this attempt (important for retries)
            self.adapter.reset_parser()

            stream = await settings.lm.acomplete(**completion_kwargs)

            # Process each chunk through adapter, which yields events
            async for chunk in stream:  # type: ignore[union-attr]
                async for event in self.adapter.process_chunk(chunk, self, self.signature):
                    emit_event(event)

            # Finalize and get validated outputs
            outputs, native_tool_calls, _ = await self.adapter.finalize(self.signature)
            prediction = Prediction(
                module=self,
                native_tool_calls=native_tool_calls,
                **outputs,
            )
            emit_event(prediction)

            return prediction

        except Exception as exc:
            import traceback

            error_event = type(
                "StreamError",
                (StreamEvent,),
                {
                    "error": str(exc),
                    "traceback": traceback.format_exc(),
                    "module": self,
                },
            )()
            emit_event(error_event)
            raise
Attributes
model property

Get the model name override, or None to use LM's default.

Functions
__init__(signature, *, tools=None, max_turns=10, adapter=None, model=None, **kwargs)

Initialize a Predict module.

Parameters:

Name Type Description Default
signature type[Signature] | str

Signature defining inputs and outputs, or a string in format "inputs -> outputs" (e.g., "question -> answer")

required
model str | None

Model name (overrides global default)

None
tools list[Tool] | None

List of tool functions (decorated with @tool) or Pydantic models

None
max_turns int

Maximum number of LLM calls for tool execution loop (default: 10)

10
adapter ChatAdapter | None

Custom adapter (defaults to ChatAdapter)

None
**kwargs Any

Additional arguments for chat completion (temperature, callbacks, etc.)

{}
Source code in src/udspy/module/predict.py
def __init__(
    self,
    signature: type[Signature] | str,
    *,
    tools: list[Tool] | None = None,
    max_turns: int = 10,
    adapter: ChatAdapter | None = None,
    model: str | None = None,
    **kwargs: Any,
):
    """Initialize a Predict module.

    Args:
        signature: Signature defining inputs and outputs, or a string in
                  format "inputs -> outputs" (e.g., "question -> answer")
        model: Model name (overrides global default)
        tools: List of tool functions (decorated with @tool) or Pydantic models
        max_turns: Maximum number of LLM calls for tool execution loop (default: 10)
        adapter: Custom adapter (defaults to ChatAdapter)
        **kwargs: Additional arguments for chat completion (temperature, callbacks, etc.)
    """
    if isinstance(signature, str):
        signature = Signature.from_string(signature)

    self.signature = signature
    self._model = model
    self._kwargs = kwargs
    self.max_turns = max_turns
    if max_turns < 1:
        raise ValueError("max_turns must be at least 1")
    self.adapter = adapter or ChatAdapter()

    self.init_module(tools=tools)
aexecute(*, stream=False, auto_execute_tools=True, history=None, **inputs) async

Core execution method - handles both streaming and non-streaming.

This is the single implementation point for LLM interaction. It always returns a Prediction, and emits events to the queue if one is active.

Parameters:

Name Type Description Default
stream bool

If True, request streaming from OpenAI. If False, use regular API.

False
auto_execute_tools bool

If True, automatically execute tools and continue. If False, return Prediction with tool_calls for manual handling.

True
history History | None

Optional History object for multi-turn conversations.

None
**inputs Any

Input values matching the signature's input fields

{}

Returns:

Type Description
Prediction

Final Prediction object (after all tool executions if auto_execute_tools=True)

Source code in src/udspy/module/predict.py
@suspendable
@with_callbacks
async def aexecute(
    self,
    *,
    stream: bool = False,
    auto_execute_tools: bool = True,
    history: History | None = None,
    **inputs: Any,
) -> Prediction:
    """Core execution method - handles both streaming and non-streaming.

    This is the single implementation point for LLM interaction. It always
    returns a Prediction, and emits events to the queue if one is active.

    Args:
        stream: If True, request streaming from OpenAI. If False, use regular API.
        auto_execute_tools: If True, automatically execute tools and continue.
            If False, return Prediction with tool_calls for manual handling.
        history: Optional History object for multi-turn conversations.
        **inputs: Input values matching the signature's input fields

    Returns:
        Final Prediction object (after all tool executions if auto_execute_tools=True)
    """
    if history is None:
        history = History()

    self._validate_inputs(inputs)
    self._build_initial_messages(inputs, history)

    return await self._aexecute(
        stream=stream,
        auto_execute_tools=auto_execute_tools,
        history=history,
    )
init_module(tools=None)

Initialize or reinitialize Predict with new tools.

This method rebuilds the tools dictionary and regenerates tool schemas. It's designed to be called from module callbacks to dynamically modify available tools during execution.

Parameters:

Name Type Description Default
tools list[Any] | None

New tools to initialize with. Can be: - Functions decorated with @tool - Tool instances - None to clear all tools

None
Example
from udspy import module_callback

@module_callback
def add_specialized_tools(context):
    # Get current tools
    current_tools = list(context.module.tools.values())

    # Add new tools
    new_tools = [weather_tool, calendar_tool]

    # Reinitialize with all tools
    context.module.init_module(tools=current_tools + new_tools)

    return "Added weather and calendar tools"
Source code in src/udspy/module/predict.py
def init_module(self, tools: list[Any] | None = None) -> None:
    """Initialize or reinitialize Predict with new tools.

    This method rebuilds the tools dictionary and regenerates tool schemas.
    It's designed to be called from module callbacks to dynamically modify
    available tools during execution.

    Args:
        tools: New tools to initialize with. Can be:
            - Functions decorated with @tool
            - Tool instances
            - None to clear all tools

    Example:
        ```python
        from udspy import module_callback

        @module_callback
        def add_specialized_tools(context):
            # Get current tools
            current_tools = list(context.module.tools.values())

            # Add new tools
            new_tools = [weather_tool, calendar_tool]

            # Reinitialize with all tools
            context.module.init_module(tools=current_tools + new_tools)

            return "Added weather and calendar tools"
        ```
    """
    self._init_tools(tools or [])

PredictContext

Bases: ModuleContext

Context for Predict module callbacks.

Provides access to both the module and the conversation history, allowing callbacks to inspect past interactions.

Attributes:

Name Type Description
module

The Predict module instance

history

Conversation history (if provided)

Source code in src/udspy/module/callbacks.py
class PredictContext(ModuleContext):
    """Context for Predict module callbacks.

    Provides access to both the module and the conversation history,
    allowing callbacks to inspect past interactions.

    Attributes:
        module: The Predict module instance
        history: Conversation history (if provided)
    """

    def __init__(self, module: "Predict", history: Optional["History"] = None):
        """Initialize Predict context.

        Args:
            module: The Predict module instance
            history: Conversation history (if any)
        """
        super().__init__(module)
        self.history = history
Functions
__init__(module, history=None)

Initialize Predict context.

Parameters:

Name Type Description Default
module Predict

The Predict module instance

required
history Optional[History]

Conversation history (if any)

None
Source code in src/udspy/module/callbacks.py
def __init__(self, module: "Predict", history: Optional["History"] = None):
    """Initialize Predict context.

    Args:
        module: The Predict module instance
        history: Conversation history (if any)
    """
    super().__init__(module)
    self.history = history

Prediction

Bases: StreamEvent, dict[str, Any]

Final prediction result with attribute access.

This is both a StreamEvent (can be yielded from astream) and a dict (for convenient attribute access to outputs).

Attributes:

Name Type Description
module

The module that produced this prediction

native_tool_calls

Tool calls from native LLM response (if any)

Example
pred = Prediction(answer="Paris", reasoning="France's capital")
print(pred.answer)  # "Paris"
print(pred["answer"])  # "Paris"
print(pred.is_final)  # True for top-level result
print(pred.module)  # Module instance that produced this
Source code in src/udspy/streaming.py
class Prediction(StreamEvent, dict[str, Any]):
    """Final prediction result with attribute access.

    This is both a StreamEvent (can be yielded from astream) and a dict
    (for convenient attribute access to outputs).

    Attributes:
        module: The module that produced this prediction
        native_tool_calls: Tool calls from native LLM response (if any)

    Example:
        ```python
        pred = Prediction(answer="Paris", reasoning="France's capital")
        print(pred.answer)  # "Paris"
        print(pred["answer"])  # "Paris"
        print(pred.is_final)  # True for top-level result
        print(pred.module)  # Module instance that produced this
        ```
    """

    def __init__(
        self,
        /,
        module: "Module | None" = None,
        native_tool_calls: list["ToolCall"] | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(**kwargs)
        self.module = module
        self.native_tool_calls = native_tool_calls

    @property
    def is_final(self) -> bool:
        """Whether this is the final prediction (no pending tool calls)."""
        return bool(len(self.keys()) and not self.native_tool_calls)

    def __getattr__(self, name: str) -> Any:
        try:
            return self[name]
        except KeyError:
            raise AttributeError(f"Prediction has no attribute '{name}'") from None

    def __setattr__(self, name: str, value: Any) -> None:
        self[name] = value
Attributes
is_final property

Whether this is the final prediction (no pending tool calls).

ReAct

Bases: Module

ReAct (Reasoning and Acting) module for tool-using agents.

ReAct iteratively reasons about the current situation and decides whether to call a tool or finish the task. Key features:

  • Iterative reasoning with tool execution
  • Tool confirmation support for sensitive operations
  • Real-time streaming of reasoning and tool usage

Example (Basic Usage):

from udspy import ReAct, Signature, InputField, OutputField, tool
from pydantic import Field

@tool(name="search", description="Search for information")
def search(query: str = Field(...)) -> str:
    return f"Results for: {query}"

class QA(Signature):
    '''Answer questions using available tools.'''
    question: str = InputField()
    answer: str = OutputField()

react = ReAct(QA, tools=[search])
result = react(question="What is the weather in Tokyo?")

Example (Streaming):

# Stream the agent's reasoning process in real-time
async for event in react.astream(question="What is Python?"):
    if isinstance(event, OutputStreamChunk):
        print(event.delta, end="", flush=True)
    elif isinstance(event, Prediction):
        print(f"Answer: {event.answer}")

See examples/react_streaming.py for a complete streaming example.

Example (Tools with Confirmation):

from udspy import ConfirmationRequired, ConfirmationRejected

@tool(name="delete_file", require_confirmation=True)
def delete_file(path: str = Field(...)) -> str:
    return f"Deleted {path}"

react = ReAct(QA, tools=[delete_file])

try:
    result = await react.aforward(question="Delete /tmp/test.txt")
except ConfirmationRequired as e:
    # User is asked for confirmation
    print(f"Confirm: {e.question}")
    # Approve: respond_to_confirmation(e.confirmation_id, approved=True)
    # Reject: respond_to_confirmation(e.confirmation_id, approved=False, status="rejected")
    result = await react.aresume("yes", e)

Source code in src/udspy/module/react.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
class ReAct(Module):
    """ReAct (Reasoning and Acting) module for tool-using agents.

    ReAct iteratively reasons about the current situation and decides whether
    to call a tool or finish the task. Key features:

    - Iterative reasoning with tool execution
    - Tool confirmation support for sensitive operations
    - Real-time streaming of reasoning and tool usage

    Example (Basic Usage):
        ```python
        from udspy import ReAct, Signature, InputField, OutputField, tool
        from pydantic import Field

        @tool(name="search", description="Search for information")
        def search(query: str = Field(...)) -> str:
            return f"Results for: {query}"

        class QA(Signature):
            '''Answer questions using available tools.'''
            question: str = InputField()
            answer: str = OutputField()

        react = ReAct(QA, tools=[search])
        result = react(question="What is the weather in Tokyo?")
        ```

    Example (Streaming):
        ```python
        # Stream the agent's reasoning process in real-time
        async for event in react.astream(question="What is Python?"):
            if isinstance(event, OutputStreamChunk):
                print(event.delta, end="", flush=True)
            elif isinstance(event, Prediction):
                print(f"Answer: {event.answer}")
        ```

        See examples/react_streaming.py for a complete streaming example.

    Example (Tools with Confirmation):
        ```python
        from udspy import ConfirmationRequired, ConfirmationRejected

        @tool(name="delete_file", require_confirmation=True)
        def delete_file(path: str = Field(...)) -> str:
            return f"Deleted {path}"

        react = ReAct(QA, tools=[delete_file])

        try:
            result = await react.aforward(question="Delete /tmp/test.txt")
        except ConfirmationRequired as e:
            # User is asked for confirmation
            print(f"Confirm: {e.question}")
            # Approve: respond_to_confirmation(e.confirmation_id, approved=True)
            # Reject: respond_to_confirmation(e.confirmation_id, approved=False, status="rejected")
            result = await react.aresume("yes", e)
        ```
    """

    def __init__(
        self,
        signature: type[Signature] | str,
        tools: list[Callable | Tool],
        *,
        max_iters: int = 10,
        **kwargs: Any,
    ):
        """Initialize ReAct module.

        Args:
            signature: Signature defining inputs and outputs, or signature string
            tools: List of tool functions (decorated with @tool) or Tool objects
            max_iters: Maximum number of reasoning iterations (default: 10)
        """
        if isinstance(signature, str):
            signature = Signature.from_string(signature)

        self.signature = signature
        self.user_signature = signature
        self.max_iters = max_iters
        self._kwargs = kwargs
        self._context: ReactContext | None = None  # Current execution context

        self.init_module(tools=tools)

    def _init_tools(self) -> None:
        """Initialize tools dictionary with user-provided tools."""
        tool_list = [t if isinstance(t, Tool) else Tool(t) for t in self._tools]
        self.tools: dict[str, Tool] = {tool.name: tool for tool in tool_list if tool.name}
        self._add_builtin_tools()

    def _add_builtin_tools(self) -> None:
        """Add built-in finish tool."""
        outputs = ", ".join([f"`{k}`" for k in self.signature.get_output_fields().keys()])

        def finish_tool() -> str:  # pyright: ignore[reportUnusedParameter]
            """Finish tool that accepts and ignores any arguments."""
            return "Task completed"

        self.tools["finish"] = Tool(
            func=finish_tool,
            name="finish",
            description=f"Call this when you have all information needed to produce {outputs}",
        )

    def _rebuild_signatures(self) -> None:
        """Rebuild react and extract signatures with current tools.

        This method reconstructs the signatures used by the ReAct module,
        incorporating the current set of tools. It's called during initialization
        and when tools are dynamically updated via init_module().
        """
        self.react_signature = self._build_react_signature()
        self.extract_signature = self._build_extract_signature()
        self.react_module = Predict(self.react_signature, **self._kwargs)
        self.extract_module = ChainOfThought(self.extract_signature, **self._kwargs)

    def _build_react_signature(self) -> type[Signature]:
        """Build ReAct signature with tool descriptions in instructions."""
        inputs = ", ".join([f"`{k}`" for k in self.user_signature.get_input_fields().keys()])
        outputs = ", ".join([f"`{k}`" for k in self.user_signature.get_output_fields().keys()])

        base_instructions = getattr(self.user_signature, "__doc__", "")
        instr = [f"{base_instructions}\n"] if base_instructions else []

        instr.extend(
            [
                f"You are an Agent. In each episode, you will be given the fields {inputs} as input. And you can see your past trajectory so far.",
                f"Your goal is to use one or more of the supplied tools to collect any necessary information for producing {outputs}.\n",
                "To do this, you will interleave next_thought, next_tool_name, and next_tool_args in each turn, and also when finishing the task.",
                "After each step, you receive a resulting observation, which gets appended to your trajectory.\n",
                "When writing next_thought, you may reason about the current situation and plan for future steps.",
                "When selecting the next_tool_name and its next_tool_args, the tool must be one of:\n",
            ]
        )

        instr.append(Tools(tools=list(self.tools.values())).format())
        instr.extend(
            [
                "IMPORTANT: You must respond with a JSON object in your message content containing the fields: "
                '{"next_thought": "...", "next_tool_name": "...", "next_tool_args": {...}}.',
                "NEVER use function calling or tool calling syntax - return the JSON as plain text in your response.",
            ]
        )

        react_input_fields: dict[str, type] = {
            "trajectory": str,
        }
        for name, field_info in self.user_signature.get_input_fields().items():
            react_input_fields[name] = field_info.annotation or str

        react_output_fields: dict[str, type] = {
            "next_thought": str,
            "next_tool_name": Literal[*self.tools.keys()],  # type: ignore[dict-item]
            "next_tool_args": dict[str, Any],
        }

        return make_signature(
            react_input_fields,
            react_output_fields,
            "\n".join(instr),
        )

    def _build_extract_signature(self) -> type[Signature]:
        """Build extract signature for final answer extraction from trajectory."""
        extract_input_fields: dict[str, type] = {}
        extract_output_fields: dict[str, type] = {}

        for name, field_info in self.user_signature.get_input_fields().items():
            extract_input_fields[name] = field_info.annotation or str

        for name, field_info in self.user_signature.get_output_fields().items():
            extract_output_fields[name] = field_info.annotation or str

        extract_input_fields["trajectory"] = str

        return make_signature(
            extract_input_fields,
            extract_output_fields,
            "Extract the final answer from the trajectory",
        )

    def init_module(self, tools: list[Any] | None = None) -> None:
        """Initialize or reinitialize ReAct with new tools.

        This method rebuilds the tools dictionary and regenerates the react signature
        with new tool descriptions. Built-in tools are automatically preserved.

        Args:
            tools: New tools to initialize with. Can be:
                - Functions decorated with @tool
                - Tool instances
                - None to clear all non-built-in tools

        Example:
            ```python from udspy import module_callback

            @module_callback
            def load_specialized_tools(context):
                # Get current non-built-in tools
                current_tools = [
                    t for t in context.module.tools.values()
                    if t.name not in builtin_tool_names
                ]

                # Add new tools
                new_tools = [weather_tool, calendar_tool]

                # Reinitialize with all tools
                context.module.init_module(tools=current_tools + new_tools)

                return f"Added {len(new_tools)} specialized tools"
            ```
        """

        self._tools = tools or []
        self._init_tools()
        self._rebuild_signatures()

    def _format_trajectory(self, trajectory: list[Episode]) -> str:
        """Format trajectory as a string for the LLM.

        Args:
            trajectory: List of episodes

        Returns:
            Formatted string representation
        """
        if not trajectory:
            return "No actions taken yet."

        lines = []
        for step, episode in enumerate(trajectory, start=1):
            lines.append(json.dumps({"step": step, **episode}))

        return "\n".join(lines)

    async def _execute_tool_call(self, tool_name: str, tool_args: dict[str, Any]) -> str:
        """Execute a single tool call and return observation.

        Uses self._context for accessing trajectory, input_args, etc.

        Args:
            tool_name: Name of tool to execute
            tool_args: Arguments for the tool

        Returns:
            Observation string from tool execution

        Raises:
            ConfirmationRequired: When human input is needed
        """
        logger.debug(f"Tool call - name: {tool_name}, args: {tool_args}")
        tool = None
        try:
            tool = self.tools[tool_name]
            result = await tool.acall(**tool_args)

            if is_module_callback(result):
                # Pass module's context to callback
                if self._context is None:
                    raise RuntimeError("Module callback called outside execution context")
                observation = await execute_function_async(result, {"context": self._context})
            else:
                observation = str(result)

            return observation
        except ConfirmationRequired as e:
            # Store context for resumption
            if self._context is not None:
                e.context = {
                    "trajectory": self._context.trajectory.copy(),
                    "input_args": self._context.input_args.copy(),
                    "stream": self._context.stream,
                }
            raise
        except Exception as e:
            parts = [
                f"Traceback '{tool_name}': {format_tool_exception(e)}.",
            ]
            if tool is not None:
                parts.append(f"Expected tool args schema: {tool.parameters}.")
            logger.warning(f"Tool execution failed: {e}")
            return " ".join(parts)

    async def _execute_iteration(
        self,
        *,
        stream: bool = False,
    ) -> bool:
        """
        Execute a single ReAct iteration (create one episode).
        Uses self._context for trajectory and input_args.

        Args:
            stream: Whether to stream sub-module execution

        Returns:
            should_stop: Whether to stop the ReAct loop

        Raises:
            ConfirmationRequired: When human input is needed
        """
        # Get context from instance
        if self._context is None:
            raise RuntimeError("_execute_iteration called outside execution context")

        trajectory = self._context.trajectory
        input_args = self._context.input_args

        # Normal flow: get next thought and tool calls from LLM
        formatted_trajectory = self._format_trajectory(trajectory)
        pred = await self.react_module.aexecute(
            stream=stream,
            **input_args,
            trajectory=formatted_trajectory,
        )

        thought = pred.get("next_thought", "").strip()
        tool_name = pred.get("next_tool_name", None)
        if tool_name not in self.tools:
            raise ValueError(
                "Invalid tool name selected by agent. Available tools: , ".join(
                    f"`{name}`" for name in self.tools.keys()
                )
            )

        tool_args = pred.get("next_tool_args", None)
        observation = await self._execute_tool_call(tool_name, tool_args)

        episode: Episode = {
            "thought": thought,
            "tool_name": tool_name,
            "tool_args": tool_args,
            "observation": observation,
        }
        trajectory.append(episode)

        should_stop = tool_name == "finish"
        return should_stop

    @with_callbacks
    async def aexecute(
        self,
        *,
        stream: bool = False,
        _trajectory: list[Episode] | None = None,
        history: History | None = None,
        **input_args: Any,
    ) -> Prediction:
        """Execute ReAct loop.

        Args:
            stream: Passed to sub-modules
            _trajectory: Internal - restored trajectory for resumption (list of completed episodes)
            history: History object for streaming (not used currently)
            **input_args: Input values matching signature's input fields

        Returns:
            Prediction with trajectory and output fields

        Raises:
            ConfirmationRequired: When human input is needed
        """
        max_iters = input_args.pop("max_iters", self.max_iters)
        trajectory: list[Episode] = _trajectory if _trajectory is not None else []
        if history is None:
            history = History()

        # Set up React context for this execution
        self._context = ReactContext(
            module=self, trajectory=trajectory, input_args=input_args, stream=stream
        )

        try:
            # Continue with normal iteration loop
            while len(trajectory) < max_iters:
                try:
                    should_stop = await self._execute_iteration(stream=stream)
                    if should_stop:
                        break

                except ValueError as e:
                    logger.warning(f"Agent failed to select valid tool: {e}")
                    error_episode: Episode = {
                        "thought": "",
                        "tool_name": None,
                        "tool_args": None,
                        "observation": f"Error: {e}",
                    }
                    trajectory.append(error_episode)
                    break

            formatted_trajectory = self._format_trajectory(trajectory)
            extract = await self.extract_module.aexecute(
                stream=stream,
                **input_args,
                trajectory=formatted_trajectory,
            )
            result_dict = {
                key: value
                for key, value in extract.items()
                if key in self.signature.get_output_fields()
            }
            history.add_assistant_message(json.dumps(result_dict))

            prediction = Prediction(
                **result_dict,
                reasoning=extract["reasoning"],
                trajectory=trajectory,
                module=self,
            )
            emit_event(prediction)
            return prediction
        finally:
            # Clean up context
            self._context = None
Functions
__init__(signature, tools, *, max_iters=10, **kwargs)

Initialize ReAct module.

Parameters:

Name Type Description Default
signature type[Signature] | str

Signature defining inputs and outputs, or signature string

required
tools list[Callable | Tool]

List of tool functions (decorated with @tool) or Tool objects

required
max_iters int

Maximum number of reasoning iterations (default: 10)

10
Source code in src/udspy/module/react.py
def __init__(
    self,
    signature: type[Signature] | str,
    tools: list[Callable | Tool],
    *,
    max_iters: int = 10,
    **kwargs: Any,
):
    """Initialize ReAct module.

    Args:
        signature: Signature defining inputs and outputs, or signature string
        tools: List of tool functions (decorated with @tool) or Tool objects
        max_iters: Maximum number of reasoning iterations (default: 10)
    """
    if isinstance(signature, str):
        signature = Signature.from_string(signature)

    self.signature = signature
    self.user_signature = signature
    self.max_iters = max_iters
    self._kwargs = kwargs
    self._context: ReactContext | None = None  # Current execution context

    self.init_module(tools=tools)
aexecute(*, stream=False, _trajectory=None, history=None, **input_args) async

Execute ReAct loop.

Parameters:

Name Type Description Default
stream bool

Passed to sub-modules

False
_trajectory list[Episode] | None

Internal - restored trajectory for resumption (list of completed episodes)

None
history History | None

History object for streaming (not used currently)

None
**input_args Any

Input values matching signature's input fields

{}

Returns:

Type Description
Prediction

Prediction with trajectory and output fields

Raises:

Type Description
ConfirmationRequired

When human input is needed

Source code in src/udspy/module/react.py
@with_callbacks
async def aexecute(
    self,
    *,
    stream: bool = False,
    _trajectory: list[Episode] | None = None,
    history: History | None = None,
    **input_args: Any,
) -> Prediction:
    """Execute ReAct loop.

    Args:
        stream: Passed to sub-modules
        _trajectory: Internal - restored trajectory for resumption (list of completed episodes)
        history: History object for streaming (not used currently)
        **input_args: Input values matching signature's input fields

    Returns:
        Prediction with trajectory and output fields

    Raises:
        ConfirmationRequired: When human input is needed
    """
    max_iters = input_args.pop("max_iters", self.max_iters)
    trajectory: list[Episode] = _trajectory if _trajectory is not None else []
    if history is None:
        history = History()

    # Set up React context for this execution
    self._context = ReactContext(
        module=self, trajectory=trajectory, input_args=input_args, stream=stream
    )

    try:
        # Continue with normal iteration loop
        while len(trajectory) < max_iters:
            try:
                should_stop = await self._execute_iteration(stream=stream)
                if should_stop:
                    break

            except ValueError as e:
                logger.warning(f"Agent failed to select valid tool: {e}")
                error_episode: Episode = {
                    "thought": "",
                    "tool_name": None,
                    "tool_args": None,
                    "observation": f"Error: {e}",
                }
                trajectory.append(error_episode)
                break

        formatted_trajectory = self._format_trajectory(trajectory)
        extract = await self.extract_module.aexecute(
            stream=stream,
            **input_args,
            trajectory=formatted_trajectory,
        )
        result_dict = {
            key: value
            for key, value in extract.items()
            if key in self.signature.get_output_fields()
        }
        history.add_assistant_message(json.dumps(result_dict))

        prediction = Prediction(
            **result_dict,
            reasoning=extract["reasoning"],
            trajectory=trajectory,
            module=self,
        )
        emit_event(prediction)
        return prediction
    finally:
        # Clean up context
        self._context = None
init_module(tools=None)

Initialize or reinitialize ReAct with new tools.

This method rebuilds the tools dictionary and regenerates the react signature with new tool descriptions. Built-in tools are automatically preserved.

Parameters:

Name Type Description Default
tools list[Any] | None

New tools to initialize with. Can be: - Functions decorated with @tool - Tool instances - None to clear all non-built-in tools

None
Example

```python from udspy import module_callback

@module_callback def load_specialized_tools(context): # Get current non-built-in tools current_tools = [ t for t in context.module.tools.values() if t.name not in builtin_tool_names ]

# Add new tools
new_tools = [weather_tool, calendar_tool]

# Reinitialize with all tools
context.module.init_module(tools=current_tools + new_tools)

return f"Added {len(new_tools)} specialized tools"

```

Source code in src/udspy/module/react.py
def init_module(self, tools: list[Any] | None = None) -> None:
    """Initialize or reinitialize ReAct with new tools.

    This method rebuilds the tools dictionary and regenerates the react signature
    with new tool descriptions. Built-in tools are automatically preserved.

    Args:
        tools: New tools to initialize with. Can be:
            - Functions decorated with @tool
            - Tool instances
            - None to clear all non-built-in tools

    Example:
        ```python from udspy import module_callback

        @module_callback
        def load_specialized_tools(context):
            # Get current non-built-in tools
            current_tools = [
                t for t in context.module.tools.values()
                if t.name not in builtin_tool_names
            ]

            # Add new tools
            new_tools = [weather_tool, calendar_tool]

            # Reinitialize with all tools
            context.module.init_module(tools=current_tools + new_tools)

            return f"Added {len(new_tools)} specialized tools"
        ```
    """

    self._tools = tools or []
    self._init_tools()
    self._rebuild_signatures()

Functions

is_module_callback(obj)

Check if an object is a module callback.

Parameters:

Name Type Description Default
obj Any

Object to check

required

Returns:

Type Description
bool

True if obj is a ModuleCallback instance

Source code in src/udspy/module/callbacks.py
def is_module_callback(obj: Any) -> bool:
    """Check if an object is a module callback.

    Args:
        obj: Object to check

    Returns:
        True if obj is a ModuleCallback instance
    """
    return isinstance(obj, ModuleCallback)