"""Base callback handler for LangChain."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, TypeVar, Union
from uuid import UUID
from tenacity import RetryCallState
if TYPE_CHECKING:
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage
from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult
[docs]
class RetrieverManagerMixin:
"""Mixin for Retriever callbacks."""
[docs]
def on_retriever_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run when Retriever errors.
Args:
error (BaseException): The error that occurred.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments.
"""
[docs]
def on_retriever_end(
self,
documents: Sequence[Document],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run when Retriever ends running.
Args:
documents (Sequence[Document]): The documents retrieved.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments.
"""
[docs]
class LLMManagerMixin:
"""Mixin for LLM callbacks."""
[docs]
def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run on new LLM token. Only available when streaming is enabled.
Args:
token (str): The new token.
chunk (GenerationChunk | ChatGenerationChunk): The new generated chunk,
containing content and other information.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments.
"""
[docs]
def on_llm_end(
self,
response: LLMResult,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run when LLM ends running.
Args:
response (LLMResult): The response which was generated.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments.
"""
[docs]
def on_llm_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run when LLM errors.
Args:
error (BaseException): The error that occurred.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments.
"""
[docs]
class ChainManagerMixin:
"""Mixin for chain callbacks."""
[docs]
def on_chain_end(
self,
outputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run when chain ends running.
Args:
outputs (Dict[str, Any]): The outputs of the chain.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments."""
[docs]
def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run when chain errors.
Args:
error (BaseException): The error that occurred.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments."""
[docs]
def on_agent_action(
self,
action: AgentAction,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run on agent action.
Args:
action (AgentAction): The agent action.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments."""
[docs]
def on_agent_finish(
self,
finish: AgentFinish,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run on the agent end.
Args:
finish (AgentFinish): The agent finish.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments."""
[docs]
class CallbackManagerMixin:
"""Mixin for callback manager."""
[docs]
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Run when LLM starts running.
**ATTENTION**: This method is called for non-chat models (regular LLMs). If
you're implementing a handler for a chat model,
you should use on_chat_model_start instead.
Args:
serialized (Dict[str, Any]): The serialized LLM.
prompts (List[str]): The prompts.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
metadata (Optional[Dict[str, Any]]): The metadata.
kwargs (Any): Additional keyword arguments.
"""
[docs]
def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Run when a chat model starts running.
**ATTENTION**: This method is called for chat models. If you're implementing
a handler for a non-chat model, you should use on_llm_start instead.
Args:
serialized (Dict[str, Any]): The serialized chat model.
messages (List[List[BaseMessage]]): The messages.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
metadata (Optional[Dict[str, Any]]): The metadata.
kwargs (Any): Additional keyword arguments.
"""
# NotImplementedError is thrown intentionally
# Callback handler will fall back to on_llm_start if this is exception is thrown
raise NotImplementedError(
f"{self.__class__.__name__} does not implement `on_chat_model_start`"
)
[docs]
def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Run when the Retriever starts running.
Args:
serialized (Dict[str, Any]): The serialized Retriever.
query (str): The query.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
metadata (Optional[Dict[str, Any]]): The metadata.
kwargs (Any): Additional keyword arguments.
"""
[docs]
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Run when a chain starts running.
Args:
serialized (Dict[str, Any]): The serialized chain.
inputs (Dict[str, Any]): The inputs.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
metadata (Optional[Dict[str, Any]]): The metadata.
kwargs (Any): Additional keyword arguments.
"""
[docs]
class RunManagerMixin:
"""Mixin for run manager."""
[docs]
def on_text(
self,
text: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run on an arbitrary text.
Args:
text (str): The text.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments.
"""
[docs]
def on_retry(
self,
retry_state: RetryCallState,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run on a retry event.
Args:
retry_state (RetryCallState): The retry state.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments.
"""
[docs]
def on_custom_event(
self,
name: str,
data: Any,
*,
run_id: UUID,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Override to define a handler for a custom event.
Args:
name: The name of the custom event.
data: The data for the custom event. Format will match
the format specified by the user.
run_id: The ID of the run.
tags: The tags associated with the custom event
(includes inherited tags).
metadata: The metadata associated with the custom event
(includes inherited metadata).
.. versionadded:: 0.2.15
"""
[docs]
class BaseCallbackHandler(
LLMManagerMixin,
ChainManagerMixin,
ToolManagerMixin,
RetrieverManagerMixin,
CallbackManagerMixin,
RunManagerMixin,
):
"""Base callback handler for LangChain."""
raise_error: bool = False
"""Whether to raise an error if an exception occurs."""
run_inline: bool = False
"""Whether to run the callback inline."""
@property
def ignore_llm(self) -> bool:
"""Whether to ignore LLM callbacks."""
return False
@property
def ignore_retry(self) -> bool:
"""Whether to ignore retry callbacks."""
return False
@property
def ignore_chain(self) -> bool:
"""Whether to ignore chain callbacks."""
return False
@property
def ignore_agent(self) -> bool:
"""Whether to ignore agent callbacks."""
return False
@property
def ignore_retriever(self) -> bool:
"""Whether to ignore retriever callbacks."""
return False
@property
def ignore_chat_model(self) -> bool:
"""Whether to ignore chat model callbacks."""
return False
@property
def ignore_custom_event(self) -> bool:
"""Ignore custom event."""
return False
[docs]
class AsyncCallbackHandler(BaseCallbackHandler):
"""Async callback handler for LangChain."""
[docs]
async def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
"""Run when LLM starts running.
**ATTENTION**: This method is called for non-chat models (regular LLMs). If
you're implementing a handler for a chat model,
you should use on_chat_model_start instead.
Args:
serialized (Dict[str, Any]): The serialized LLM.
prompts (List[str]): The prompts.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
metadata (Optional[Dict[str, Any]]): The metadata.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_chat_model_start(
self,
serialized: Dict[str, Any],
messages: List[List[BaseMessage]],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
"""Run when a chat model starts running.
**ATTENTION**: This method is called for chat models. If you're implementing
a handler for a non-chat model, you should use on_llm_start instead.
Args:
serialized (Dict[str, Any]): The serialized chat model.
messages (List[List[BaseMessage]]): The messages.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
metadata (Optional[Dict[str, Any]]): The metadata.
kwargs (Any): Additional keyword arguments.
"""
# NotImplementedError is thrown intentionally
# Callback handler will fall back to on_llm_start if this is exception is thrown
raise NotImplementedError(
f"{self.__class__.__name__} does not implement `on_chat_model_start`"
)
[docs]
async def on_llm_new_token(
self,
token: str,
*,
chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run on new LLM token. Only available when streaming is enabled.
Args:
token (str): The new token.
chunk (GenerationChunk | ChatGenerationChunk): The new generated chunk,
containing content and other information.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_llm_end(
self,
response: LLMResult,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run when LLM ends running.
Args:
response (LLMResult): The response which was generated.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_llm_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run when LLM errors.
Args:
error: The error that occurred.
run_id: The run ID. This is the ID of the current run.
parent_run_id: The parent run ID. This is the ID of the parent run.
tags: The tags.
kwargs (Any): Additional keyword arguments.
- response (LLMResult): The response which was generated before
the error occurred.
"""
[docs]
async def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
"""Run when a chain starts running.
Args:
serialized (Dict[str, Any]): The serialized chain.
inputs (Dict[str, Any]): The inputs.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
metadata (Optional[Dict[str, Any]]): The metadata.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_chain_end(
self,
outputs: Dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run when a chain ends running.
Args:
outputs (Dict[str, Any]): The outputs of the chain.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run when chain errors.
Args:
error (BaseException): The error that occurred.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_text(
self,
text: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run on an arbitrary text.
Args:
text (str): The text.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_retry(
self,
retry_state: RetryCallState,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""Run on a retry event.
Args:
retry_state (RetryCallState): The retry state.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_agent_action(
self,
action: AgentAction,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run on agent action.
Args:
action (AgentAction): The agent action.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_agent_finish(
self,
finish: AgentFinish,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run on the agent end.
Args:
finish (AgentFinish): The agent finish.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
"""Run on the retriever start.
Args:
serialized (Dict[str, Any]): The serialized retriever.
query (str): The query.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
metadata (Optional[Dict[str, Any]]): The metadata.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_retriever_end(
self,
documents: Sequence[Document],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run on the retriever end.
Args:
documents (Sequence[Document]): The documents retrieved.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
kwargs (Any): Additional keyword arguments."""
[docs]
async def on_retriever_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""Run on retriever error.
Args:
error (BaseException): The error that occurred.
run_id (UUID): The run ID. This is the ID of the current run.
parent_run_id (UUID): The parent run ID. This is the ID of the parent run.
tags (Optional[List[str]]): The tags.
kwargs (Any): Additional keyword arguments.
"""
[docs]
async def on_custom_event(
self,
name: str,
data: Any,
*,
run_id: UUID,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
"""Override to define a handler for a custom event.
Args:
name: The name of the custom event.
data: The data for the custom event. Format will match
the format specified by the user.
run_id: The ID of the run.
tags: The tags associated with the custom event
(includes inherited tags).
metadata: The metadata associated with the custom event
(includes inherited metadata).
.. versionadded:: 0.2.15
"""
T = TypeVar("T", bound="BaseCallbackManager")
[docs]
class BaseCallbackManager(CallbackManagerMixin):
"""Base callback manager for LangChain."""
[docs]
def __init__(
self,
handlers: List[BaseCallbackHandler],
inheritable_handlers: Optional[List[BaseCallbackHandler]] = None,
parent_run_id: Optional[UUID] = None,
*,
tags: Optional[List[str]] = None,
inheritable_tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
inheritable_metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Initialize callback manager.
Args:
handlers (List[BaseCallbackHandler]): The handlers.
inheritable_handlers (Optional[List[BaseCallbackHandler]]):
The inheritable handlers. Default is None.
parent_run_id (Optional[UUID]): The parent run ID. Default is None.
tags (Optional[List[str]]): The tags. Default is None.
inheritable_tags (Optional[List[str]]): The inheritable tags.
Default is None.
metadata (Optional[Dict[str, Any]]): The metadata. Default is None.
"""
self.handlers: List[BaseCallbackHandler] = handlers
self.inheritable_handlers: List[BaseCallbackHandler] = (
inheritable_handlers or []
)
self.parent_run_id: Optional[UUID] = parent_run_id
self.tags = tags or []
self.inheritable_tags = inheritable_tags or []
self.metadata = metadata or {}
self.inheritable_metadata = inheritable_metadata or {}
[docs]
def copy(self: T) -> T:
"""Copy the callback manager."""
return self.__class__(
handlers=self.handlers,
inheritable_handlers=self.inheritable_handlers,
parent_run_id=self.parent_run_id,
tags=self.tags,
inheritable_tags=self.inheritable_tags,
metadata=self.metadata,
inheritable_metadata=self.inheritable_metadata,
)
@property
def is_async(self) -> bool:
"""Whether the callback manager is async."""
return False
[docs]
def add_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None:
"""Add a handler to the callback manager.
Args:
handler (BaseCallbackHandler): The handler to add.
inherit (bool): Whether to inherit the handler. Default is True.
"""
if handler not in self.handlers:
self.handlers.append(handler)
if inherit and handler not in self.inheritable_handlers:
self.inheritable_handlers.append(handler)
[docs]
def remove_handler(self, handler: BaseCallbackHandler) -> None:
"""Remove a handler from the callback manager.
Args:
handler (BaseCallbackHandler): The handler to remove.
"""
self.handlers.remove(handler)
self.inheritable_handlers.remove(handler)
[docs]
def set_handlers(
self, handlers: List[BaseCallbackHandler], inherit: bool = True
) -> None:
"""Set handlers as the only handlers on the callback manager.
Args:
handlers (List[BaseCallbackHandler]): The handlers to set.
inherit (bool): Whether to inherit the handlers. Default is True.
"""
self.handlers = []
self.inheritable_handlers = []
for handler in handlers:
self.add_handler(handler, inherit=inherit)
[docs]
def set_handler(self, handler: BaseCallbackHandler, inherit: bool = True) -> None:
"""Set handler as the only handler on the callback manager.
Args:
handler (BaseCallbackHandler): The handler to set.
inherit (bool): Whether to inherit the handler. Default is True.
"""
self.set_handlers([handler], inherit=inherit)
Callbacks = Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]