Supercharge SageMaker with Embeddings
Introduction
Deep Lake serves as a vector database that can integrate with Amazon SageMaker, allowing the storage of embeddings which are vector representations of data used in deep learning models. By leveraging Deep Lake’s vector database capabilities, developers can accelerate the training and deployment of their deep learning models. In this blog post, we will explore the details of this integration and how the use of vector databases can enhance the performance and accuracy of deep learning models.
Getting Started
Step 1: Installing required libraries and authenticating with Deep Lake and SageMaker
First, we will install everything we’ll need.
!python3 -m pip install --upgrade langchain deeplake sagemaker tiktoken boto3
You’d need to authenticate into Deep Lake and AWS. You can get an API key from the Deep Lake platform here
!aws configure set aws_access_key_id [REDACTED]
!aws configure set aws_secret_access_key [REDACTED]
!aws configure set default.region us-east-1
!activeloop login -t [REDACTED]
!export ACTIVELOOP_USERNAME=[REDACTED]
Now you can proceed to deploy the SageMaker endpoint as usual:
import sagemaker
sess = sagemaker.Session()
sagemaker_session_bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
model_name = "all-MiniLM-L6-v2" # "mpt-7b-instruct" "dolly-v2-12b" "flan-t5-xxl" "all-MiniLM-L6-v2"
from sagemaker.huggingface.model import HuggingFaceModel
huggingface_model = HuggingFaceModel(
model_data=f"s3://{sess.default_bucket()}/{model_name}/model.tar.gz",
role=role,
transformers_version="4.26",
pytorch_version="1.13",
py_version="py39",
model_server_workers=1
)
predictor = huggingface_model.deploy(
initial_instance_count=1,
instance_type="ml.g5.8xlarge", # "ml.g5.4xlarge"
endpoint_name=model_name,
model_data_download_timeout=3600,
container_startup_health_check_timeout=3600,
update_endpoint=True
)
After the endpoint is deployed, you can use the following code to create a SageMaker endpoint embeddings object:
import json
from typing import Dict, List
from langchain.embeddings import SagemakerEndpointEmbeddings
from langchain.embeddings.sagemaker_endpoint import EmbeddingsContentHandler
class ContentHandler(EmbeddingsContentHandler):
content_type = "application/json"
accepts = "application/json"
def transform_input(self, inputs: list[str], model_kwargs: Dict) -> bytes:
input_str = json.dumps({"inputs": inputs, **model_kwargs})
return input_str.encode('utf-8')
def transform_output(self, output: bytes) -> List[List[float]]:
response_json = json.loads(output.read().decode("utf-8"))
return response_json["vectors"]
embeddings = SagemakerEndpointEmbeddings(
endpoint_name="all-MiniLM-L6-v2",
region_name="us-east-1",
content_handler=ContentHandler()
)
Step 2: Indexing Apache Spark Code Base
To index the code base, first clone the repository, parse the code, break it into chunks, and apply indexing:
!git clone --depth 1 https://github.com/apache/spark
Next, load all files inside the repository:
import os
from langchain.document_loaders import TextLoader
root_dir = "spark"
docs = []
for dirpath, dirnames, filenames in os.walk(root_dir):
for file in filenames:
try:
loader = TextLoader(os.path.join(dirpath, file), encoding="utf-8")
docs.extend(loader.load_and_split())
except Exception as e:
pass
Subsequently, divide the loaded files into chunks:
from langchain.text_splitter import CharacterTextSplitter
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(docs)
Performing the indexing process roughly takes 4 minutes to calculate embeddings and upload them to Activeloop.
import os
from langchain.vectorstores import DeepLake
username = os.getenv("[ACTIVELOOP_USERNAME]")
dataset_path = f"hub://{username}/spark"
db = DeepLake(dataset_path=f"hub://{username}/spark", embedding_function=embeddings)
db.add_documents(texts)
If the dataset has been already created, you can load it later without recomputing embeddings as seen below.
Step 3: Conversational Retriever Chain
First, load the dataset, establish the retriever, and create the Conversational Chain:
Dataset(path=f"hub://{username}/spark", read_only=True, tensors=["embedding", "ids", "metadata", "text"])
The Deep Lake dataset serving as a VectorStore has 4 tensors including the embedding, its ids, metadata including the filename of the text, and the text itself. A preview of the dataset would look something like this:
tensor | htype | shape | dtype | compression |
---|---|---|---|---|
embedding | generic | (23156, 1536) | float32 | None |
ids | text | (23156, 1) | str | None |
metadata | json | (23156, 1) | str | None |
text | text | (23156, 1) | str | None |
from langchain.vectorstores import DeepLake
db = DeepLake(dataset_path=dataset_path, embedding_function=embeddings)
retriever = db.as_retriever()
retriever.search_kwargs["distance_metric"] = "cos"
retriever.search_kwargs["fetch_k"] = 100
retriever.search_kwargs["maximal_marginal_relevance"] = True
retriever.search_kwargs["k"] = 10
Connect to the LLM
for question answering.
import json
from langchain import SagemakerEndpoint
from langchain.chains import RetrievalQA
from langchain.llms.sagemaker_endpoint import LLMContentHandler
class ContentHandler(LLMContentHandler):
content_type = "application/json"
accepts = "application/json"
def transform_input(self, prompt: str, model_kwargs) -> bytes:
input_str = json.dumps({prompt: prompt, **model_kwargs})
return input_str.encode("utf-8")
def transform_output(self, output: bytes) -> str:
response_json = json.loads(output.read().decode("utf-8"))
return response_json[0]["generated_text"]
model = SagemakerEndpoint(
endpoint_name="all-MiniLM-L6-v2",
region_name="us-east-1",
credentials_profile_name="default",
content_handler=ContentHandler(),
)
qa = RetrievalQA.from_llm(model, retriever=retriever)