Source code for langchain_aws.llms.sagemaker_endpoint

"""Sagemaker InvokeEndpoint API."""

import io
import re
from abc import abstractmethod
from typing import Any, Dict, Generic, Iterator, List, Mapping, Optional, TypeVar, Union

from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.pydantic_v1 import Extra, root_validator

INPUT_TYPE = TypeVar("INPUT_TYPE", bound=Union[str, List[str]])
OUTPUT_TYPE = TypeVar("OUTPUT_TYPE", bound=Union[str, List[List[float]], Iterator])


[docs] def enforce_stop_tokens(text: str, stop: List[str]) -> str: """Cut off the text as soon as any stop words occur.""" return re.split("|".join(stop), text, maxsplit=1)[0]
[docs] class LineIterator: """ A helper class for parsing the byte stream input. The output of the model will be in the following format: b'{"outputs": [" a"]}\n' b'{"outputs": [" challenging"]}\n' b'{"outputs": [" problem"]}\n' ... While usually each PayloadPart event from the event stream will contain a byte array with a full json, this is not guaranteed and some of the json objects may be split acrossPayloadPart events. For example: {'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}} This class accounts for this by concatenating bytes written via the 'write' function and then exposing a method which will return lines (ending with a '\n' character) within the buffer via the 'scan_lines' function. It maintains the position of the last read position to ensure that previous bytes are not exposed again. For more details see: https://aws.amazon.com/blogs/machine-learning/elevating-the-generative-ai-experience-introducing-streaming-support-in-amazon-sagemaker-hosting/ """
[docs] def __init__(self, stream: Any) -> None: self.byte_iterator = iter(stream) self.buffer = io.BytesIO() self.read_pos = 0
def __iter__(self) -> "LineIterator": return self def __next__(self) -> Any: while True: self.buffer.seek(self.read_pos) line = self.buffer.readline() if line and line[-1] == ord("\n"): self.read_pos += len(line) return line[:-1] try: chunk = next(self.byte_iterator) except StopIteration: if self.read_pos < self.buffer.getbuffer().nbytes: continue raise if "PayloadPart" not in chunk: # Unknown Event Type continue self.buffer.seek(0, io.SEEK_END) self.buffer.write(chunk["PayloadPart"]["Bytes"])
[docs] class ContentHandlerBase(Generic[INPUT_TYPE, OUTPUT_TYPE]): """A handler class to transform input from LLM to a format that SageMaker endpoint expects. Similarly, the class handles transforming output from the SageMaker endpoint to a format that LLM class expects. """ """ Example: .. code-block:: python class ContentHandler(ContentHandlerBase): content_type = "application/json" accepts = "application/json" def transform_input(self, prompt: str, model_kwargs: Dict) -> bytes: input_str = json.dumps({prompt: prompt, **model_kwargs}) return input_str.encode('utf-8') def transform_output(self, output: bytes) -> str: response_json = json.loads(output.read().decode("utf-8")) return response_json[0]["generated_text"] """ content_type: Optional[str] = "text/plain" """The MIME type of the input data passed to endpoint""" accepts: Optional[str] = "text/plain" """The MIME type of the response data returned from endpoint"""
[docs] @abstractmethod def transform_input(self, prompt: INPUT_TYPE, model_kwargs: Dict) -> bytes: """Transforms the input to a format that model can accept as the request Body. Should return bytes or seekable file like object in the format specified in the content_type request header. """
[docs] @abstractmethod def transform_output(self, output: bytes) -> OUTPUT_TYPE: """Transforms the output from the model to string that the LLM class expects. """
[docs] class LLMContentHandler(ContentHandlerBase[str, str]): """Content handler for LLM class."""
[docs] class SagemakerEndpoint(LLM): """Sagemaker Inference Endpoint models. To use, you must supply the endpoint name from your deployed Sagemaker model & the region where it is deployed. To authenticate, the AWS client uses the following methods to automatically load credentials: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html If a specific credential profile should be used, you must pass the name of the profile from the ~/.aws/credentials file that is to be used. Make sure the credentials / roles used have the required policies to access the Sagemaker endpoint. See: https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html """ """ Args: region_name: The aws region e.g., `us-west-2`. Fallsback to AWS_DEFAULT_REGION env variable or region specified in ~/.aws/config. credentials_profile_name: The name of the profile in the ~/.aws/credentials or ~/.aws/config files, which has either access keys or role information specified. If not specified, the default credential profile or, if on an EC2 instance, credentials from IMDS will be used. client: boto3 client for Sagemaker Endpoint content_handler: Implementation for model specific LLMContentHandler Example: .. code-block:: python from langchain_community.llms import SagemakerEndpoint endpoint_name = ( "my-endpoint-name" ) region_name = ( "us-west-2" ) credentials_profile_name = ( "default" ) se = SagemakerEndpoint( endpoint_name=endpoint_name, region_name=region_name, credentials_profile_name=credentials_profile_name ) #Use with boto3 client client = boto3.client( "sagemaker-runtime", region_name=region_name ) se = SagemakerEndpoint( endpoint_name=endpoint_name, client=client ) """ client: Any = None """Boto3 client for sagemaker runtime""" endpoint_name: str = "" """The name of the endpoint from the deployed Sagemaker model. Must be unique within an AWS Region.""" region_name: str = "" """The aws region where the Sagemaker model is deployed, eg. `us-west-2`.""" credentials_profile_name: Optional[str] = None """The name of the profile in the ~/.aws/credentials or ~/.aws/config files, which has either access keys or role information specified. If not specified, the default credential profile or, if on an EC2 instance, credentials from IMDS will be used. See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html """ content_handler: LLMContentHandler """The content handler class that provides an input and output transform functions to handle formats between LLM and the endpoint. """ streaming: bool = False """Whether to stream the results.""" """ Example: .. code-block:: python from langchain_community.llms.sagemaker_endpoint import LLMContentHandler class ContentHandler(LLMContentHandler): content_type = "application/json" accepts = "application/json" def transform_input(self, prompt: str, model_kwargs: Dict) -> bytes: input_str = json.dumps({prompt: prompt, **model_kwargs}) return input_str.encode('utf-8') def transform_output(self, output: bytes) -> str: response_json = json.loads(output.read().decode("utf-8")) return response_json[0]["generated_text"] """ model_kwargs: Optional[Dict] = None """Keyword arguments to pass to the model.""" endpoint_kwargs: Optional[Dict] = None """Optional attributes passed to the invoke_endpoint function. See `boto3`_. docs for more info. .. _boto3: <https://boto3.amazonaws.com/v1/documentation/api/latest/index.html> """ class Config: """Configuration for this pydantic object.""" extra = Extra.forbid @root_validator() def validate_environment(cls, values: Dict) -> Dict: """Dont do anything if client provided externally""" if values.get("client") is not None: return values """Validate that AWS credentials to and python package exists in environment.""" try: import boto3 try: if values["credentials_profile_name"] is not None: session = boto3.Session( profile_name=values["credentials_profile_name"] ) else: # use default credentials session = boto3.Session() values["client"] = session.client( "sagemaker-runtime", region_name=values["region_name"] ) except Exception as e: raise ValueError( "Could not load credentials to authenticate with AWS client. " "Please check that credentials in the specified " "profile name are valid." ) from e except ImportError: raise ImportError( "Could not import boto3 python package. " "Please install it with `pip install boto3`." ) return values @property def _identifying_params(self) -> Mapping[str, Any]: """Get the identifying parameters.""" _model_kwargs = self.model_kwargs or {} return { **{"endpoint_name": self.endpoint_name}, **{"model_kwargs": _model_kwargs}, } @property def _llm_type(self) -> str: """Return type of llm.""" return "sagemaker_endpoint" def _stream( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[GenerationChunk]: _model_kwargs = self.model_kwargs or {} _model_kwargs = {**_model_kwargs, **kwargs} _endpoint_kwargs = self.endpoint_kwargs or {} try: resp = self.client.invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=self.content_handler.transform_input(prompt, _model_kwargs), ContentType=self.content_handler.content_type, **_endpoint_kwargs, ) iterator = LineIterator(resp["Body"]) for line in iterator: text = self.content_handler.transform_output(line) if stop is not None: text = enforce_stop_tokens(text, stop) if text: chunk = GenerationChunk(text=text) yield chunk if run_manager: run_manager.on_llm_new_token(chunk.text) except Exception as e: raise ValueError(f"Error raised by streaming inference endpoint: {e}") def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """Call out to Sagemaker inference endpoint. Args: prompt: The prompt to pass into the model. stop: Optional list of stop words to use when generating. Returns: The string generated by the model. Example: .. code-block:: python response = se("Tell me a joke.") """ _model_kwargs = self.model_kwargs or {} _model_kwargs = {**_model_kwargs, **kwargs} _endpoint_kwargs = self.endpoint_kwargs or {} body = self.content_handler.transform_input(prompt, _model_kwargs) content_type = self.content_handler.content_type accepts = self.content_handler.accepts if self.streaming and run_manager: completion: str = "" for chunk in self._stream(prompt, stop, run_manager, **kwargs): completion += chunk.text return completion try: response = self.client.invoke_endpoint( EndpointName=self.endpoint_name, Body=body, ContentType=content_type, Accept=accepts, **_endpoint_kwargs, ) except Exception as e: raise ValueError(f"Error raised by inference endpoint: {e}") text = self.content_handler.transform_output(response["Body"]) if stop is not None: text = enforce_stop_tokens(text, stop) return text