Skip to content

Callbacks API

udspy.callback

Callback system for telemetry and monitoring.

This module provides a DSPy-compatible callback system for tracking LLM calls, module executions, and tool invocations. Compatible with Opik, MLflow, and other observability tools that support DSPy callbacks.

Classes

BaseCallback

Base class for callback handlers.

Subclass this and implement the desired handlers to track LLM calls, module executions, and tool invocations. Compatible with DSPy callback interface.

Example
from udspy import settings
from udspy.callback import BaseCallback

class LoggingCallback(BaseCallback):
    def on_lm_start(self, call_id, instance, inputs):
        print(f"LLM called with: {inputs}")

    def on_lm_end(self, call_id, outputs, exception):
        if exception:
            print(f"LLM failed: {exception}")
        else:
            print(f"LLM returned: {outputs}")

    def on_tool_start(self, call_id, instance, inputs):
        print(f"Tool {instance.name} called with: {inputs}")

    def on_tool_end(self, call_id, outputs, exception):
        print(f"Tool returned: {outputs}")

# Set globally
settings.configure(callbacks=[LoggingCallback()])
Source code in src/udspy/callback.py
class BaseCallback:
    """Base class for callback handlers.

    Subclass this and implement the desired handlers to track LLM calls, module
    executions, and tool invocations. Compatible with DSPy callback interface.

    Example:
        ```python
        from udspy import settings
        from udspy.callback import BaseCallback

        class LoggingCallback(BaseCallback):
            def on_lm_start(self, call_id, instance, inputs):
                print(f"LLM called with: {inputs}")

            def on_lm_end(self, call_id, outputs, exception):
                if exception:
                    print(f"LLM failed: {exception}")
                else:
                    print(f"LLM returned: {outputs}")

            def on_tool_start(self, call_id, instance, inputs):
                print(f"Tool {instance.name} called with: {inputs}")

            def on_tool_end(self, call_id, outputs, exception):
                print(f"Tool returned: {outputs}")

        # Set globally
        settings.configure(callbacks=[LoggingCallback()])
        ```
    """

    def on_module_start(
        self,
        call_id: str,
        instance: Any,
        inputs: dict[str, Any],
    ) -> None:
        """Called when a module's forward() method starts.

        Args:
            call_id: Unique identifier for this call
            instance: The Module instance being called
            inputs: Input arguments as key-value pairs
        """
        pass

    def on_module_end(
        self,
        call_id: str,
        outputs: Any | None,
        exception: Exception | None = None,
    ) -> None:
        """Called when a module's forward() method completes.

        Args:
            call_id: Unique identifier for this call
            outputs: The module's output, or None if exception occurred
            exception: Exception raised during execution, if any
        """
        pass

    def on_lm_start(
        self,
        call_id: str,
        instance: Any,
        inputs: dict[str, Any],
    ) -> None:
        """Called when an LLM call starts.

        Args:
            call_id: Unique identifier for this call
            instance: The LLM client or adapter instance
            inputs: LLM input parameters (messages, model, etc.)
        """
        pass

    def on_lm_end(
        self,
        call_id: str,
        outputs: dict[str, Any] | None,
        exception: Exception | None = None,
    ) -> None:
        """Called when an LLM call completes.

        Args:
            call_id: Unique identifier for this call
            outputs: LLM response, or None if exception occurred
            exception: Exception raised during execution, if any
        """
        pass

    def on_tool_start(
        self,
        call_id: str,
        instance: Any,
        inputs: dict[str, Any],
    ) -> None:
        """Called when a tool is invoked.

        Args:
            call_id: Unique identifier for this call
            instance: The Tool instance being called
            inputs: Tool input arguments as key-value pairs
        """
        pass

    def on_tool_end(
        self,
        call_id: str,
        outputs: Any | None,
        exception: Exception | None = None,
    ) -> None:
        """Called when a tool invocation completes.

        Args:
            call_id: Unique identifier for this call
            outputs: Tool output, or None if exception occurred
            exception: Exception raised during execution, if any
        """
        pass
Functions
on_lm_end(call_id, outputs, exception=None)

Called when an LLM call completes.

Parameters:

Name Type Description Default
call_id str

Unique identifier for this call

required
outputs dict[str, Any] | None

LLM response, or None if exception occurred

required
exception Exception | None

Exception raised during execution, if any

None
Source code in src/udspy/callback.py
def on_lm_end(
    self,
    call_id: str,
    outputs: dict[str, Any] | None,
    exception: Exception | None = None,
) -> None:
    """Called when an LLM call completes.

    Args:
        call_id: Unique identifier for this call
        outputs: LLM response, or None if exception occurred
        exception: Exception raised during execution, if any
    """
    pass
on_lm_start(call_id, instance, inputs)

Called when an LLM call starts.

Parameters:

Name Type Description Default
call_id str

Unique identifier for this call

required
instance Any

The LLM client or adapter instance

required
inputs dict[str, Any]

LLM input parameters (messages, model, etc.)

required
Source code in src/udspy/callback.py
def on_lm_start(
    self,
    call_id: str,
    instance: Any,
    inputs: dict[str, Any],
) -> None:
    """Called when an LLM call starts.

    Args:
        call_id: Unique identifier for this call
        instance: The LLM client or adapter instance
        inputs: LLM input parameters (messages, model, etc.)
    """
    pass
on_module_end(call_id, outputs, exception=None)

Called when a module's forward() method completes.

Parameters:

Name Type Description Default
call_id str

Unique identifier for this call

required
outputs Any | None

The module's output, or None if exception occurred

required
exception Exception | None

Exception raised during execution, if any

None
Source code in src/udspy/callback.py
def on_module_end(
    self,
    call_id: str,
    outputs: Any | None,
    exception: Exception | None = None,
) -> None:
    """Called when a module's forward() method completes.

    Args:
        call_id: Unique identifier for this call
        outputs: The module's output, or None if exception occurred
        exception: Exception raised during execution, if any
    """
    pass
on_module_start(call_id, instance, inputs)

Called when a module's forward() method starts.

Parameters:

Name Type Description Default
call_id str

Unique identifier for this call

required
instance Any

The Module instance being called

required
inputs dict[str, Any]

Input arguments as key-value pairs

required
Source code in src/udspy/callback.py
def on_module_start(
    self,
    call_id: str,
    instance: Any,
    inputs: dict[str, Any],
) -> None:
    """Called when a module's forward() method starts.

    Args:
        call_id: Unique identifier for this call
        instance: The Module instance being called
        inputs: Input arguments as key-value pairs
    """
    pass
on_tool_end(call_id, outputs, exception=None)

Called when a tool invocation completes.

Parameters:

Name Type Description Default
call_id str

Unique identifier for this call

required
outputs Any | None

Tool output, or None if exception occurred

required
exception Exception | None

Exception raised during execution, if any

None
Source code in src/udspy/callback.py
def on_tool_end(
    self,
    call_id: str,
    outputs: Any | None,
    exception: Exception | None = None,
) -> None:
    """Called when a tool invocation completes.

    Args:
        call_id: Unique identifier for this call
        outputs: Tool output, or None if exception occurred
        exception: Exception raised during execution, if any
    """
    pass
on_tool_start(call_id, instance, inputs)

Called when a tool is invoked.

Parameters:

Name Type Description Default
call_id str

Unique identifier for this call

required
instance Any

The Tool instance being called

required
inputs dict[str, Any]

Tool input arguments as key-value pairs

required
Source code in src/udspy/callback.py
def on_tool_start(
    self,
    call_id: str,
    instance: Any,
    inputs: dict[str, Any],
) -> None:
    """Called when a tool is invoked.

    Args:
        call_id: Unique identifier for this call
        instance: The Tool instance being called
        inputs: Tool input arguments as key-value pairs
    """
    pass

Functions

with_callbacks(fn)

Decorator to add callback functionality to methods.

Automatically calls appropriate callback handlers before and after method execution. Handles both sync and async methods.

The decorator determines which callback handlers to call based on the instance type (Module, Tool, etc.) and method name.

Source code in src/udspy/callback.py
def with_callbacks(fn: Callable) -> Callable:
    """Decorator to add callback functionality to methods.

    Automatically calls appropriate callback handlers before and after
    method execution. Handles both sync and async methods.

    The decorator determines which callback handlers to call based on the
    instance type (Module, Tool, etc.) and method name.
    """

    def _execute_start_callbacks(
        instance: Any,
        fn: Callable,
        call_id: str,
        callbacks: list[BaseCallback],
        args: tuple,
        kwargs: dict,
    ) -> None:
        """Execute all start callbacks."""
        # Get function arguments
        inputs = {"kwargs": kwargs, "args": args}

        for callback in callbacks:
            try:
                handler = _get_on_start_handler(callback, instance, fn)
                handler(call_id=call_id, instance=instance, inputs=inputs)
            except Exception as e:
                logger.warning(f"Error in callback {callback.__class__.__name__}.on_*_start: {e}")

    def _execute_end_callbacks(
        instance: Any,
        fn: Callable,
        call_id: str,
        results: Any,
        exception: Exception | None,
        callbacks: list[BaseCallback],
    ) -> None:
        """Execute all end callbacks."""
        for callback in callbacks:
            try:
                handler = _get_on_end_handler(callback, instance, fn)
                handler(call_id=call_id, outputs=results, exception=exception)
            except Exception as e:
                logger.warning(f"Error in callback {callback.__class__.__name__}.on_*_end: {e}")

    def _get_active_callbacks(instance: Any) -> list[BaseCallback]:
        """Get combined global and instance-level callbacks."""
        from udspy.settings import settings

        return settings.callbacks

    # Handle async functions
    if inspect.iscoroutinefunction(fn):

        @functools.wraps(fn)
        async def async_wrapper(instance: Any, *args: Any, **kwargs: Any) -> Any:
            callbacks = _get_active_callbacks(instance)
            if not callbacks:
                return await fn(instance, *args, **kwargs)

            call_id = uuid.uuid4().hex

            _execute_start_callbacks(instance, fn, call_id, callbacks, args, kwargs)

            # Set active call ID for nested tracking
            parent_call_id = ACTIVE_CALL_ID.get()
            ACTIVE_CALL_ID.set(call_id)

            results = None
            exception = None
            try:
                results = await fn(instance, *args, **kwargs)
                return results
            except Exception as e:
                exception = e
                raise
            finally:
                ACTIVE_CALL_ID.set(parent_call_id)
                _execute_end_callbacks(instance, fn, call_id, results, exception, callbacks)

        return async_wrapper

    # Handle sync functions
    else:

        @functools.wraps(fn)
        def sync_wrapper(instance: Any, *args: Any, **kwargs: Any) -> Any:
            callbacks = _get_active_callbacks(instance)
            if not callbacks:
                return fn(instance, *args, **kwargs)

            call_id = uuid.uuid4().hex

            _execute_start_callbacks(instance, fn, call_id, callbacks, args, kwargs)

            # Set active call ID for nested tracking
            parent_call_id = ACTIVE_CALL_ID.get()
            ACTIVE_CALL_ID.set(call_id)

            results = None
            exception = None
            try:
                results = fn(instance, *args, **kwargs)
                return results
            except Exception as e:
                exception = e
                raise
            finally:
                ACTIVE_CALL_ID.set(parent_call_id)
                _execute_end_callbacks(instance, fn, call_id, results, exception, callbacks)

        return sync_wrapper