【大模型】RAG实践
为了便捷构建 LLM 应用,需要基于本地部署的LLM模型,自定义一个 LLM 类,将LLM接入到 LangChain 框架中。完成自定义 LLM 类之后,可以以完全一致的方式调用 LangChain 的接口,而无需考虑底层模型调用的不一致。已经使用OpenAI的接口标准部署了大模型,通过以下方式注册模型,接入LangChain。api_url: str = Field(..., descripti
前言
使用Langhain与自定义的LLM、Embedding模型实现RAG 。
1.自定义大模型
为了便捷构建 LLM 应用,需要基于本地部署的LLM模型,自定义一个 LLM 类,将LLM接入到 LangChain 框架中。完成自定义 LLM 类之后,可以以完全一致的方式调用 LangChain 的接口,而无需考虑底层模型调用的不一致。
已经使用OpenAI的接口标准部署了大模型,通过以下方式注册模型,接入LangChain。
from langchain.llms.base import LLM
from typing import Any, List, Optional
from langchain.callbacks.manager import CallbackManagerForLLMRun
import requests
import json
from pydantic import BaseModel, Field
class Qwen2_LLM(LLM, BaseModel):
api_url: str = Field(..., description="远程API的URL")
@property
def _llm_type(self) -> str:
return "Qwen2_LLM"
def __init__(self, **data: Any):
super().__init__(**data)
print(f"API URL设置为: {self.api_url}")
def _call(self, prompt: str, stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any) -> str:
"""
调用API并返回模型响应
"""
# 设置系统消息和用户输入
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
# 请求API
try:
response = requests.post(
f"{self.api_url}/chatglm4",
headers={"Authorization": "Bearer YOUR_API_KEY"}, # 替换为实际的API密钥
json={
"model": "glm-4",
"messages": messages,
"stream": False,
"max_tokens": 512,
"temperature": 0.4,
"presence_penalty": 1.2,
"top_p": 0.8
}
)
response.raise_for_status() # 检查HTTP错误
result = response.json()
# 提取生成的文本
return result['choices'][0]['message']['content']
except requests.exceptions.HTTPError as http_err:
print(f"HTTP错误发生: {http_err}")
except Exception as e:
print(f"请求API时发生错误: {e}")
return "" # 在出现错误时返回空字符串或其他适当的默认值
2.自定义Embedding
在LangChain中对文本做向量化一般是通过HuggingFaceEmbeddings方法,将模型路径传进去,然后将embedding方法作为参数传到生成向量的方法中。代码如下。
embeddings = HuggingFaceEmbeddings(model_name="/ssd/dongzhenheng/Pretrain_Model/sentence_transformers/m3e-base")
docsearch = Chroma.from_documents(split_docs, embeddings, persist_directory="./vector_store")
在本次实践中, embedding方法是调用已经部署好服务的接口,接入LangChain还需要再做一次封装,具体代码如下。
# 向量生成函数
def sentence_embedding(query_list):
data = json.dumps({"model_name": "bge-large-zh", "queries": query_list})
LLM_EMBEDDING_URL = "http://xxx.xxx.com/v1/embeddings"
response = requests.post(LLM_EMBEDDING_URL, headers={'Content-Type': 'application/json'}, data=data)
return response.json()['data'][0]['embedding']
# 自定义嵌入类,模拟一个具有 embed_documents 方法的类
class CustomEmbeddingFunction:
def embed_documents(self, texts):
return [sentence_embedding([text]) for text in texts]
def embed_query(self, query):
return sentence_embedding([query])
3.查询扩展
3.1 生成多个相同的query
# 使用大模型生成查询扩展
def query_expansion(query, api_url="http://xxx-xxx.flyme.com/v1"):
prompt = f"""请根据以下查询生成 5 个不同但相关的查询:
查询:{query}
生成的查询列表格式为:
1. [第一个查询]
2. [第二个查询]
3. [第三个查询]
4. [第四个查询]
5. [第五个查询]"""
data = {
"prompt": prompt
}
response = requests.post(api_url, headers={'Content-Type': 'application/json'}, json=data)
result = response.json().get("result", "")
expansions = []
lines = result.strip().split('\n')
for line in lines:
if line.startswith(('1.', '2.', '3.', '4.', '5.')):
expansions.append(line.split('[')[1].split(']')[0].strip())
return expansions
3.2 HyDE
# HyDE 函数
def hyde(question, api_url="http://xxx-xxx.flyme.com/v1"):
prompt = f"""Please write a passage to answer the question\n
Try to include as many key details as possible.\n
{question}"""
data = {
"prompt": prompt
}
response = requests.post(api_url, headers={'Content-Type': 'application/json'}, json=data)
hypothetical_document = response.json().get("result", "")
embedding = sentence_embedding([hypothetical_document])
return embedding
4. 重排序
# 加载重排序模型
tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-reranker-v2-m3")
model = AutoModelForSequenceClassification.from_pretrained("BAAI/bge-reranker-v2-m3")
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
# 重排序函数
def rerank(query, documents):
inputs = tokenizer([(query, doc.page_content) for doc in documents], return_tensors='pt', padding=True, truncation=True, max_length=512).to(device)
with torch.no_grad():
scores = model(**inputs).logits.view(-1,).cpu().tolist()
sorted_docs = [doc for _, doc in sorted(zip(scores, documents), reverse=True)]
return sorted_docs
5.整体流程
import json
import requests
import os
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores import Chroma
from LLM import Qwen2_LLM # 假设你有Qwen2模型用于对话
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch
# 向量生成函数
def sentence_embedding(query_list):
data = json.dumps({"model_name": "bge-large-zh", "queries": query_list})
LLM_EMBEDDING_URL = "http://xxx.xxx/v1/embeddings"
response = requests.post(LLM_EMBEDDING_URL, headers={'Content-Type': 'application/json'}, data=data)
return response.json()['data'][0]['embedding']
# 自定义嵌入类,模拟一个具有 embed_documents 方法的类
class CustomEmbeddingFunction:
def embed_documents(self, texts):
return [sentence_embedding([text]) for text in texts]
def embed_query(self, query):
return sentence_embedding([query])
# 加载重排序模型
tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-reranker-v2-m3")
model = AutoModelForSequenceClassification.from_pretrained("BAAI/bge-reranker-v2-m3")
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
# 重排序函数
def rerank(query, documents):
inputs = tokenizer([(query, doc.page_content) for doc in documents], return_tensors='pt', padding=True, truncation=True, max_length=512).to(device)
with torch.no_grad():
scores = model(**inputs).logits.view(-1,).cpu().tolist()
sorted_docs = [doc for _, doc in sorted(zip(scores, documents), reverse=True)]
return sorted_docs
# 使用大模型生成查询扩展
def query_expansion(query, api_url="http://xxx-xxx.flyme.com/v1"):
prompt = f"""请根据以下查询生成 5 个不同但相关的查询:
查询:{query}
生成的查询列表格式为:
1. [第一个查询]
2. [第二个查询]
3. [第三个查询]
4. [第四个查询]
5. [第五个查询]"""
data = {
"prompt": prompt
}
response = requests.post(api_url, headers={'Content-Type': 'application/json'}, json=data)
result = response.json().get("result", "")
expansions = []
lines = result.strip().split('\n')
for line in lines:
if line.startswith(('1.', '2.', '3.', '4.', '5.')):
expansions.append(line.split('[')[1].split(']')[0].strip())
return expansions
# HyDE 函数
def hyde(query, api_url="http://xxx-xxx.flyme.com/v1"):
prompt = f"""请根据以下查询生成一篇详细的假设性文档:
查询:{query}"""
data = {
"prompt": prompt
}
response = requests.post(api_url, headers={'Content-Type': 'application/json'}, json=data)
hypothetical_document = response.json().get("result", "")
embedding = sentence_embedding([hypothetical_document])
return embedding
# 定义 ChatDoc 类
class ChatDoc():
def __init__(self, file_path, db_path="./chroma_db", db_name="default_db"):
self.file_path = file_path
self.splitText = []
self.db_path = db_path
self.db_name = db_name
# 定义对话提示模板
self.prompt_template = """你是一个问答机器人。你的任务是根据下述给定的已知信息回答用户问题。
确保你的回复完全依据下述已知信息。不要编造答案。
如果下述已知信息不足以回答用户的问题,请直接回复"我无法回答您的问题"。
已知信息:\n{info}\n用户问:\n{question}\n请用中文回答用户问题。"""
# 读取文件内容
def getFile(self):
with open(self.file_path, 'r', encoding='utf-8') as f:
return f.read()
# 切分文本
def splitSentences(self):
full_text = self.getFile()
if full_text:
text_splitter = CharacterTextSplitter(
separator="。",
chunk_size=100,
chunk_overlap=20,
length_function=len,
add_start_index=True
)
texts = text_splitter.create_documents([full_text])
#self.splitText = texts
return texts
# 检查是否有现存的向量数据库
def load_or_create_db(self):
db_dir = os.path.join(self.db_path, self.db_name)
# 如果数据库已存在,加载它
if os.path.exists(db_dir):
print(f"加载现有数据库: {db_dir}")
db = Chroma(persist_directory=db_dir, embedding_function=CustomEmbeddingFunction())
else:
# 如果不存在,则处理数据,生成向量并创建数据库
texts = self.splitSentences()
self.splitText = texts
print(f"创建新的数据库: {db_dir}")
texts = [doc.page_content for doc in self.splitText]
#embeddings = [sentence_embedding([text])[0] for text in texts]
# 使用自定义的嵌入函数
embedding_functions = CustomEmbeddingFunction()
# 创建 Chroma 数据库
db = Chroma.from_texts(
texts=texts,
embedding=embedding_functions,
persist_directory=db_dir
)
db.persist() # 存储数据库到本地
return db
# 查找相关文档
def askAndFindFiles(self, question):
db = self.load_or_create_db()
retriever = db.as_retriever(search_type="similarity", search_kwargs={'k': 3})
relevant_docs = retriever.get_relevant_documents(query=question)
return relevant_docs
# 处理用户问题并返回答案
def chatWithDoc(self, question, use_hyde=False):
all_relevant_docs = []
# 查询扩展
expanded_queries = query_expansion(question)
for query in expanded_queries:
relevant_docs = self.askAndFindFiles(query)
all_relevant_docs.extend(relevant_docs)
if use_hyde:
# 使用 HyDE 进行查询扩展
hyde_embedding = hyde(question)
db = self.load_or_create_db()
retriever = db.as_retriever(search_type="similarity", search_kwargs={'k': 3})
hyde_docs = retriever.get_relevant_documents(query=hyde_embedding)
all_relevant_docs.extend(hyde_docs)
# 去重
unique_docs = list(set(all_relevant_docs))
# 检查是否有交集
if len(unique_docs) < len(all_relevant_docs):
final_docs = unique_docs
else:
final_docs = self.askAndFindFiles(question)
# 重排序
final_docs = rerank(question, final_docs)
context = final_docs
print(context)
_content = "".join([doc.page_content for doc in context])
# 格式化消息模板
messages = self.prompt_template.format(info=_content, question=question)
chat = Qwen2_LLM(api_url="http://xxx-xxx.flyme.com/v1")
return chat(messages)
# 主程序运行
if __name__ == "__main__":
chat_doc = ChatDoc('文本.docx', db_path="./chroma_db", db_name="my_custom_db")
# 使用 HyDE
response = chat_doc.chatWithDoc('被展示全手工扎染围巾的在宣传片是在哪播放的', use_hyde=True)
print(response)
# 不使用 HyDE
response = chat_doc.chatWithDoc('被展示全手工扎染围巾的在宣传片是在哪播放的', use_hyde=False)
print(response)
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)