When trying to solve a Question Answering task over a larger document corpus with the help of LLMs we need to master the following challenges:
How to manage large document(s) that exceed the token limit
How to find the document(s) relevant to the question being asked
When trying to infuse knowledge into a generative AI-powered application, we must choose which of these types to target—fine-tuning deals with elevating the parametric knowledge through fine-tuning. Since fine-tuning is a resource-intensive operation, this option is well suited for infusing static domain-specific information like domain-specific language/writing styles (medical domain, science domain, etc) or optimizing performance towards a particular task (classification, sentiment analysis, RLHF, instruction-finetuning, etc).
In contrast to that, targeting the source knowledge for domain-specific performance uplift is very well suited for all kinds of dynamic information, from knowledge bases in structured and unstructured forms up to the integration of information from live systems. This article concerns retrieval-augmented generation, a common design pattern for ingesting domain-specific information through source knowledge. It is particularly well suited for ingesting details in the form of unstructured text with semi-frequent update cycles.
Before being able to answer the questions, the documents must be processed and stored in a document store index
Load the documents
Process and split them into smaller chunks
Create a numerical vector representation of each chunk using the Amazon Bedrock Titan Embeddings model
Create an index using the chunks and the corresponding embeddings
Singlestore is a great option for getting a cloud-based vector Database.
Image credits: Singlestore
When the documents index is prepared, you are ready to ask the questions and relevant documents will be fetched based on the question being asked. The following steps will be executed:
Create an embedding of the input question
Compare the question embedding with the embeddings in the index
Fetch the (top N) relevant document chunks
Add those chunks as part of the context in the prompt
Send the prompt to the model under Amazon Bedrock
Get the contextual answer based on the documents retrieved
To follow the RAG approach we can use the LangChain framework which has integrations with different services and tools that allow efficient building of patterns such as RAG. We will be using the following tools:
LLM (Large Language Model): Anthropic Claude available through Amazon Bedrock. This model will be used to understand the document chunks and provide an answer in a human-friendly manner.
Embeddings Model: Amazon Titan Embeddings are available through Amazon Bedrock. This model will be used to generate a numerical representation of the textual documents
Document Loader: PDF Loader available through LangChain for PDFs. These are loaders that can load the documents from a source, for the sake of this notebook we are loading the sample files from a local path. This could easily be replaced with a loader to load documents from enterprise internal systems.
Vector Store: SingleStoreDB is available through LangChain. We can use SingleStoreDB to store both the embeddings and the documents.
Then begin with instantiating the LLM and the Embeddings model. Here we are using Anthropic Claude to demonstrate the use case.
Code:
prerequisites:
!pip install boto3==1.34.74 langchain==0.1.14 pypdf==4.1.0 tiktoken==0.6.0 SQLAlchemy==2.0.29 --quiet
import getpass
os.environ['AWS_DEFAULT_REGION']='us-east-1'
os.environ['AWS_ACCESS_KEY_ID']= getpass.getpass("AWS_ACCESS_KEY_ID: ")
os.environ['AWS_SECRET_ACCESS_KEY']=getpass.getpass("AWS_SECRET_ACCESS_KEY: ")
import boto3
import json
import sys
from io import StringIO
import sys
import textwrap
def print_ww(*args, width: int = 140, **kwargs):
"""Like print(), but wraps output to `width` characters (default 140)"""
buffer = StringIO()
try:
_stdout = sys.stdout
sys.stdout = buffer
print(*args, **kwargs)
output = buffer.getvalue()
finally:
sys.stdout = _stdout
for line in output.splitlines():
print("\n".join(textwrap.wrap(line, width=width)))
session = boto3.session.Session()
bedrock_client = session.client('bedrock')
Setup Langchain
We create an instance of the Bedrock classes for the LLM and the embedding models. At the time of writing, Bedrock supports one embedding model and therefore we do not need to specify any model id.
bedrock_runtime_client = session.client('bedrock-runtime')
# We will be using the Titan Embeddings Model to generate our Embeddings.
from langchain.embeddings import BedrockEmbeddings
from langchain.llms.bedrock import Bedrock
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
# - create the Anthropic Model
llm = Bedrock(model_id="anthropic.claude-v2",
client=bedrock_runtime_client,
model_kwargs={
'max_tokens_to_sample': 200
},
callbacks=[StreamingStdOutCallbackHandler()])
# - create the Titan Embeddings Model
bedrock_embeddings = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1",
client=bedrock_runtime_client)
Data Preparation
Let's first download some of the files to build our document store.
In this example, we will use several years of Amazon's Letter to Shareholders as a text corpus to perform Q&A on.
!mkdir -p ./RAG_Bedrock_data
from urllib.request import urlretrieve
urls = [
'https://s2.q4cdn.com/299287126/files/doc_financials/2023/ar/2022-Shareholder-Letter.pdf',
'https://s2.q4cdn.com/299287126/files/doc_financials/2022/ar/2021-Shareholder-Letter.pdf',
'https://s2.q4cdn.com/299287126/files/doc_financials/2021/ar/Amazon-2020-Shareholder-Letter-and-1997-Shareholder-Letter.pdf',
'https://s2.q4cdn.com/299287126/files/doc_financials/2020/ar/2019-Shareholder-Letter.pdf'
]
filenames = [
'AMZN-2022-Shareholder-Letter.pdf',
'AMZN-2021-Shareholder-Letter.pdf',
'AMZN-2020-Shareholder-Letter.pdf',
'AMZN-2019-Shareholder-Letter.pdf'
]
metadata = [
dict(year=2022, source=filenames[0]),
dict(year=2021, source=filenames[1]),
dict(year=2020, source=filenames[2]),
dict(year=2019, source=filenames[3])]
data_root = "./RAG_Bedrock_data/"
for idx, url in enumerate(urls):
file_path = data_root + filenames[idx]
urlretrieve(url, file_path)
As part of Amazon's culture, the CEO always includes a copy of the 1997 Letter to Shareholders with every new release. This will cause repetition, take longer to generate embeddings, and may skew your results. So we will take the downloaded data, trim the 1997 letter (last 3 pages), and overwrite them as processed files.
from pypdf import PdfReader, PdfWriter
import glob
local_pdfs = glob.glob(data_root + '*.pdf')
for local_pdf in local_pdfs:
pdf_reader = PdfReader(local_pdf)
pdf_writer = PdfWriter()
for pagenum in range(len(pdf_reader.pages)-3):
page = pdf_reader.pages[pagenum]
pdf_writer.add_page(page)
with open(local_pdf, 'wb') as new_file:
new_file.seek(0)
pdf_writer.write(new_file)
new_file.truncate()
After downloading we can load the documents with the help of DirectoryLoader from PyPDF available under LangChain and split them into smaller chunks.
Note: The retrieved document/text should be large enough to contain enough information to answer a question; but small enough to fit into the LLM prompt. Also, the embeddings model has a limit of the length of input tokens limited to 512 tokens, which roughly translates to ~2000 characters. For the sake of this use-case we are creating chunks of roughly 1000 characters with an overlap of 100 characters using RecursiveCharacterTextSplitter.
import numpy as np
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyPDFLoader
documents = []
for idx, file in enumerate(filenames):
loader = PyPDFLoader(data_root + file)
document = loader.load()
for document_fragment in document:
document_fragment.metadata = metadata[idx]
#print(f'{len(document)} {document}\n')
documents += document
# - in our testing Character split works better with this PDF data set
text_splitter = RecursiveCharacterTextSplitter(
# Set a really small chunk size, just to show.
chunk_size = 1000,
chunk_overlap = 100,
)
docs = text_splitter.split_documents(documents)
Before we proceed we are looking into some interesting statistics regarding the document preprocessing we just performed:
avg_doc_length = lambda documents: sum([len(doc.page_content) for doc in documents])//len(documents)
print(f'Average length among {len(documents)} documents loaded is {avg_doc_length(documents)} characters.')
print(f'After the split we have {len(docs)} documents as opposed to the original {len(documents)}.')
print(f'Average length among {len(docs)} documents (after split) is {avg_doc_length(docs)} characters.')
We had 4 PDF documents which have been split into smaller chunks.
Now we can see what a sample embedding would look like for one of those chunks.
sample_embedding = np.array(bedrock_embeddings.embed_query(docs[0].page_content))
print("Sample embedding of a document chunk: ", sample_embedding)
print("Size of the embedding: ", sample_embedding.shape)
Following the very same approach embeddings can be generated for the entire corpus and stored in a vector store.
This can be easily done using SingleStoreDB implementation inside LangChain which takes input the embeddings model and the documents to create the entire vector store.
from langchain.vectorstores import SingleStoreDB
db = SingleStoreDB.from_documents(
docs,
bedrock_embeddings,
table_name = "amazon_data"
)
Similarity Search
Here you will set your search query, and look for documents that match.
query = "How has AWS evolved?"
The first step would be to create an embedding of the query such that it could be compared with the documents
query_embedding = bedrock_embeddings.embed_query("This is a content of the document")
np.array(query_embedding)
Basic Similarity Search
The results that come back from the similarity_search_with_score
API is sorted by score from highest to lowest. The score value is represented by the Dot product. Higher scores are better, for normalized vector embeddings this would approach 1.
results_with_scores = db.similarity_search_with_score(query)
for doc, score in results_with_scores:
print(f"Content: {doc.page_content}\nMetadata: {doc.metadata}\nScore: {score}\n\n")
Similarity Search with Metadata Filtering
Additionally, you can provide metadata to your query to filter the scope of your results. The filter
parameter for search queries is a dictionary of metadata key/value pairs that will be matched to results to include/exclude them from your query.
filter = dict(year=2022)
results_with_scores = db.similarity_search_with_score(query, filter=filter)
for doc, score in results_with_scores:
print(f"Content: {doc.page_content}\nMetadata: {doc.metadata}, Score: {score}\n\n")
Top-K Matching
Top-K Matching is a filtering technique that involves a 2 stage approach.
Perform a similarity search, returning the top K matches.
Apply your metadata filter on the smaller resultset.
Note: A caveat for Top-K matching is that if the value for K is too small, there is a chance that after filtering there will be no results to return.
Using Top-K matching requires 2 values:
k
, the max number of results to return at the end of our queryfetch_k
, the max number of results to return from the similarity search before applying filters
results = db.similarity_search(query, filter=filter, k=2, fetch_k=4)
for doc in results:
print(f"Content: {doc.page_content}\nMetadata: {doc.metadata}\n\n")
Now we have the relevant documents, it's time to use the LLM to generate an answer based on these documents.
We will take our initial prompt, together with our relevant documents which were retrieved based on the results of our similarity search. We then by combining these create a prompt that we feed back to the model to get our result. At this point our model should give us highly informed information on how we can change the tire of our specific car as it was outlined in our manual.
LangChain provides an abstraction of how this can be done easily.
Customizable option
In the above scenario, we explored a quick and easy way to get a context-aware answer to your question. Now let's have a look at a more customizable option with the help of RetrievalQA where you can customize how the documents fetched should be added to prompt using chain_type
parameter. Also, if you want to control how many relevant documents should be retrieved then change the k
parameter in the cell below to see different outputs. In many scenarios you might want to know which were the source documents that the LLM used to generate the answer, you can get those documents in the output using return_source_documents
which returns the documents that are added to the context of the LLM prompt. RetrievalQA
also allows you to provide a custom prompt template which can be specific to the model.
Note: In this example we are using Anthropic Claude as the LLM under Amazon Bedrock, this particular model performs best if the inputs are provided under Human:
and the model is requested to generate an output after Assistant:
. In the cell below you see an example of how to control the prompt such that the LLM stays grounded and doesn't answer outside the context.
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
prompt_template = """
Human: Use the following pieces of context to provide a concise answer to the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.
{context}
Question: {question}
Assistant:"""
PROMPT = PromptTemplate(
template=prompt_template, input_variables=["context", "question"]
)
qa = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=db.as_retriever(
search_type="similarity", search_kwargs={"k": 3}
),
return_source_documents=True,
chain_type_kwargs={"prompt": PROMPT},
callbacks=[StreamingStdOutCallbackHandler()]
)
query = "How did AWS evolve?"
result = qa.invoke({"query": query})
print(f'Query: {result["query"]}\n')
print_ww(f'Result: {result["result"]}\n')
print(f'\nContext Documents: ')
for srcdoc in result["source_documents"]:
print_ww(f'{srcdoc}\n')
query = "Why is Amazon successful?"
result = qa.invoke({"query": query})
print(f'Query: {result["query"]}\n')
print_ww(f'Result: {result["result"]}\n')
print(f'\nContext Documents: ')
for srcdoc in result["source_documents"]:
print_ww(f'{srcdoc}\n')
query = "What business challenges has Amazon experienced?"
result = qa.invoke({"query": query})
print(f'Query: {result["query"]}\n')
print_ww(f'Result: {result["result"]}\n')
print(f'\nContext Documents: ')
for srcdoc in result["source_documents"]:
print_ww(f'{srcdoc}\n')
Clean up
Clear the downloaded PDFs and the amazon_data
table
!rm -rf ./RAG_Bedrock_data
%%sql
DROP TABLE IF EXISTS amazon_data
In the above implementation of RAG-based Question Answering we have explored the following concepts and how to implement them using Amazon Bedrock and its LangChain integration.
Loading documents of different kinds and generating embeddings to create a vector store
Retrieving documents to the question
Preparing a prompt that goes as input to the LLM
Present an answer in a human-friendly manner
Takeaways
Leverage various models available under Amazon Bedrock to see alternate outputs
Explore options such as persistent storage of embeddings and document chunks
Integration with enterprise data stores