在之前的教程中,我们逐步升级了 RAG 系统:从基础的 Naive RAG 到更精准的 Retrieve-and-Rerank RAG,再到能理解图像和音频的 Multimodal RAG。然而,这些方法在处理复杂问题,特别是需要多跳推理或理解实体之间深层关系的场景时,仍可能力有不逮。

想象一下,如果你问 AI:“苹果公司的 CEO 是谁?他的前任是谁?这家公司最知名的产品是什么?” 传统的 RAG 可能会在文档中找到“苹果公司的 CEO 是蒂姆·库克”,但要连接到“蒂姆·库克的前任”和“苹果公司的产品”这类信息,并进行逻辑推理,就显得有些吃力了。这是因为它们主要依赖于文本片段的表面相似性,而非深层次的结构化知识。

这就是 Graph RAG (图 RAG) 登场的时候了!Graph RAG 是一种创新方法,它将 知识图谱 (Knowledge Graph, KG) 的强大能力与 RAG 框架相结合。它的核心理念是:

  1. 结构化知识存储:将非结构化的文本信息转化为实体 (Entities)关系 (Relations),存储在一个易于查询和推理的图结构中。

  2. 多跳推理能力:通过遍历知识图谱中的节点和边,系统可以执行多步推理,回答涉及复杂关联的问题。

  3. 可解释的推理路径:由于信息以图的形式组织,系统可以清晰地展示其如何从起点“跳跃”到终点,提高了答案的可解释性

  4. 子图检索替代文本块检索:不再仅仅检索原始文本块,而是检索与查询相关的**局部知识图谱(子图)**作为上下文,为大语言模型提供更结构化、更丰富的背景信息。

Graph RAG 是 RAG 领域的一个重要进展,它弥补了传统 RAG 在处理复杂知识和推理方面的不足,为构建更智能、更透明的 AI 应用提供了新的途径。


1. Graph RAG 的工作原理:构建与推理知识图谱

Graph RAG 的核心在于将非结构化数据转化为结构化的知识图谱,并在此基础上进行推理。其工作流程可以概括为以下几个关键阶段:

阶段一:知识图谱构建(Knowledge Graph Construction)

这是 Graph RAG 的基础。我们需要从原始文档中抽取关键信息,并将其组织成图结构。

  • 实体抽取 (Entity Extraction):识别文本中的命名实体,例如人名、地名、组织、时间等。

  • 关系抽取 (Relation Extraction):识别实体之间存在的语义关系,例如“位于 (located_in)”、“是 CEO (is_CEO_of)”、“生产 (produces)”。

  • 知识图谱存储 (Knowledge Graph Storage):将抽取出的实体和关系存储到专门的图数据库(如 Neo4j)或内存图结构(如 NetworkX)中。每个实体表示为一个节点 (Node),每种关系表示为连接节点的边 (Edge)

  • 图嵌入 (Graph Embedding):可选但推荐的步骤。将知识图谱中的实体和关系编码为低维向量(图嵌入),这有助于后续基于相似性的检索和图神经网络的推理。

阶段二:查询理解与子图检索(Query Understanding & Subgraph Retrieval)

当用户提出问题时,Graph RAG 需要将问题映射到知识图谱上,并检索相关的知识片段。

  • 实体链接 (Entity Linking):识别用户查询中的实体,并将其映射到知识图谱中已有的实体节点。例如,如果查询提到“苹果”,系统会将其链接到知识图谱中的“苹果公司”节点。

  • 子图检索 (Subgraph Retrieval):以查询中识别出的实体为起点,在知识图谱中检索与其相关的局部子图。这个子图包含了该实体及其在特定“跳数 (hops)”内相关联的其他实体和关系。这比简单地检索文本块更能捕捉深层语义。

阶段三:图推理与答案生成(Graph Reasoning & Answer Generation)

检索到相关的子图后,系统需要利用这些结构化信息进行推理,并生成最终答案。

  • 图推理 (Graph Reasoning):可以采用多种方法:

    • 路径查找 (Pathfinding):在子图中寻找从查询实体到目标实体之间的路径,揭示多跳关系。

    • 图神经网络 (Graph Neural Networks, GNNs):将子图转化为图结构数据,通过 GNN 对其进行编码,捕捉图的结构信息和实体之间的交互模式,生成图级别或节点级别的表示。

    • 逻辑推理 (Logical Reasoning):基于图中的事实进行演绎或归纳推理。

  • 答案生成 (Answer Generation):将推理结果(通常以结构化形式呈现,如三元组列表或自然语言描述的推理路径)以及原始查询,作为上下文输入给大语言模型 (LLM)。LLM 利用这些结构化知识来生成准确、可解释的答案。


2. 动手实践:构建你的第一个 Graph RAG 系统

现在,我们来看一看如何用代码实现 Graph RAG 的核心功能。

2.1 核心组件概览

我们将实现以下关键模块:

  • EntityRelationExtractor (实体关系抽取器):从文本中识别实体和关系。

  • KnowledgeGraph (知识图谱):存储实体和关系,并支持子图查询。

  • GraphEmbedder (图嵌入器):将图结构编码为向量表示(本例中结合了简单的GNN)。

  • GraphRAG (主控制器):整合所有组件,调度整个 Graph RAG 流程。

2.2 实体关系抽取 (EntityRelationExtractor)

import spacy
from typing import List, Dict, Tuple, Any

class EntityRelationExtractor:
    def __init__(self, model: str = "en_core_web_sm"):
        """
        初始化实体关系抽取器。
        需要先下载 spaCy 模型:python -m spacy download en_core_web_sm
        """
        try:
            self.nlp = spacy.load(model)
            print(f"spaCy model '{model}' loaded successfully.")
        except OSError:
            print(f"spaCy model '{model}' not found. Please run: python -m spacy download {model}")
            raise

    def extract_entities(self, text: str) -> List[Dict[str, Any]]:
        """从文本中抽取命名实体。"""
        doc = self.nlp(text)
        entities = []
        for ent in doc.ents:
            entities.append({
                "text": ent.text,
                "label": ent.label_, # 实体类型(如PERSON, ORG, GPE等)
                "start": ent.start_char,
                "end": ent.end_char
            })
        return entities

    def extract_relations(self, text: str) -> List[Dict[str, str]]:
        """
        从文本中抽取简单的三元组关系 (主语-谓语-宾语)。
        这里使用基于依存句法分析的启发式方法,实际生产中可能需要更复杂的模型。
        """
        doc = self.nlp(text)
        relations = []
        for sent in doc.sents: # 遍历句子
            # 寻找谓语(通常是句子的根动词)
            for token in sent:
                if token.dep_ == "ROOT" and token.pos_ == "VERB":
                    subject = None
                    obj = None
                    
                    # 寻找主语和宾语
                    # 注意:这只是一个非常简化的启发式方法
                    # 生产级关系抽取会使用更复杂的规则或预训练模型 (如 OpenIE, Llama-cpp-agent等)
                    for child in token.children:
                        if child.dep_ in ["nsubj", "nsubjpass"]: # 名词主语,被动主语
                            subject = child.text
                        elif child.dep_ in ["dobj", "pobj", "attr", "acomp"]: # 直接宾语,介词宾语,属性,形容词补语
                            obj = child.text
                        
                        # 尝试通过介词短语获取宾语
                        if child.dep_ == "prep" and child.head == token:
                            for grand_child in child.children:
                                if grand_child.dep_ == "pobj":
                                    obj = grand_child.text
                                    break # 找到第一个介词宾语
                                    
                    if subject and obj:
                        relations.append({
                            "subject": subject,
                            "predicate": token.lemma_, # 谓语使用词形还原形式
                            "object": obj
                        })
        return relations
    
    def extract_triplets(self, text: str) -> Dict[str, Any]:
        """同时抽取实体和关系三元组。"""
        return {
            "entities": self.extract_entities(text),
            "relations": self.extract_relations(text)
        }

代码解析:

  • EntityRelationExtractor: 负责从非结构化文本中提取结构化信息。

    • 它使用 spaCy 库,一个流行的自然语言处理 (NLP) 库,进行实体识别和依存句法分析。

    • extract_entities 方法识别文本中的命名实体(如人名、组织名等),并返回它们的文本、类型和位置。

    • extract_relations 方法是一个简化的关系抽取示例,它通过遍历句子的依存句法树,寻找动词作为谓语,并尝试识别其主语和宾语,从而形成主语-谓语-宾语的三元组。请注意,这只是一个非常基础的实现,在实际场景中,关系抽取通常会使用更复杂的规则、模式匹配或深度学习模型(如基于 Transformer 的关系抽取模型)。

2.3 知识图谱 (KnowledgeGraph)

import networkx as nx
from py2neo import Graph, Node, Relationship # 用于 Neo4j 交互
from typing import Optional, List, Dict, Any, Tuple

class KnowledgeGraph:
    def __init__(self, use_neo4j: bool = False, neo4j_uri: str = "bolt://localhost:7687", 
                 neo4j_auth: Tuple[str, str] = ("neo4j", "password")):
        self.use_neo4j = use_neo4j
        self.neo4j_uri = neo4j_uri
        self.neo4j_auth = neo4j_auth
        
        if use_neo4j:
            try:
                # 尝试连接 Neo4j 数据库
                self.graph = Graph(self.neo4j_uri, auth=self.neo4j_auth)
                # 尝试执行一个简单查询来验证连接
                self.graph.run("RETURN 1").data()
                print("Connected to Neo4j database.")
            except Exception as e:
                print(f"Could not connect to Neo4j: {e}. Falling back to NetworkX.")
                self.graph = nx.DiGraph() # 如果连接失败,则回退到内存图
                self.use_neo4j = False
        else:
            self.graph = nx.DiGraph() # 使用 NetworkX 作为内存图

    def add_entity(self, name: str, entity_type: str = "Entity", properties: Optional[Dict[str, Any]] = None):
        """向知识图谱添加实体(节点)。"""
        if self.use_neo4j:
            # 查找现有节点或创建新节点
            node = self.graph.nodes.match(name=name).first()
            if not node:
                node = Node(entity_type, name=name, **(properties or {}))
                self.graph.create(node)
        else:
            if name not in self.graph: # 避免重复添加节点
                self.graph.add_node(name, type=entity_type, **(properties or {}))
    
    def add_relation(self, subject: str, predicate: str, obj: str, properties: Optional[Dict[str, Any]] = None):
        """向知识图谱添加关系(边)。"""
        if self.use_neo4j:
            # 确保主语和宾语节点存在
            subj_node = self.graph.nodes.match(name=subject).first()
            if not subj_node:
                subj_node = Node("Entity", name=subject)
                self.graph.create(subj_node)
            
            obj_node = self.graph.nodes.match(name=obj).first()
            if not obj_node:
                obj_node = Node("Entity", name=obj)
                self.graph.create(obj_node)
            
            # 创建关系
            rel = Relationship(subj_node, predicate, obj_node, **(properties or {}))
            self.graph.create(rel)
        else:
            # 确保节点存在
            if subject not in self.graph:
                self.graph.add_node(subject, type="Entity")
            if obj not in self.graph:
                self.graph.add_node(obj, type="Entity")
            
            # 添加有向边
            self.graph.add_edge(subject, obj, relation=predicate, **(properties or {}))
    
    def build_from_documents(self, documents: List[str], extractor: 'EntityRelationExtractor'):
        """从文档列表中构建知识图谱。"""
        print(f"Building knowledge graph from {len(documents)} documents...")
        for i, doc_text in enumerate(documents):
            # print(f"Processing document {i+1}/{len(documents)}...")
            triplets = extractor.extract_triplets(doc_text)
            
            # 添加实体
            for entity in triplets["entities"]:
                self.add_entity(entity["text"], entity["label"])
            
            # 添加关系
            for relation in triplets["relations"]:
                self.add_relation(
                    relation["subject"],
                    relation["predicate"],
                    relation["object"]
                )
        print("Knowledge graph built.")

    def find_subgraph(self, entity: str, max_hops: int = 2) -> Dict[str, Any]:
        """
        以给定实体为中心,查找其在知识图谱中指定跳数内的子图。
        返回包含节点和边的字典。
        """
        if self.use_neo4j:
            # 使用 Cypher 查询在 Neo4j 中查找子图
            # 需要安装 Neo4j 插件 apoc.path.subgraphAll
            query = """
            MATCH (start {name: $entity})
            CALL apoc.path.subgraphAll(start, {maxLevel: $hops})
            YIELD nodes, relationships
            RETURN nodes, relationships
            """
            result = self.graph.run(query, entity=entity, hops=max_hops).single()
            
            if result:
                # 将 Neo4j 的结果转换为通用格式
                subgraph_nodes = []
                for n in result["nodes"]:
                    node_props = dict(n) # 获取所有属性
                    node_props['name'] = node_props.pop('name', 'Unknown') # 确保有name
                    subgraph_nodes.append(node_props)
                
                subgraph_edges = []
                for r in result["relationships"]:
                    subgraph_edges.append((r.start_node["name"], r.type, r.end_node["name"]))
                
                return {
                    "nodes": subgraph_nodes,
                    "edges": subgraph_edges
                }
            else:
                return {"nodes": [], "edges": []}
        else:
            # 使用 NetworkX 进行 BFS 搜索子图
            if entity not in self.graph:
                return {"nodes": [], "edges": []}
            
            visited_nodes = set()
            queue = [(entity, 0)] # (节点, 当前跳数)
            
            while queue:
                current_node, current_hops = queue.pop(0)
                if current_node in visited_nodes or current_hops > max_hops:
                    continue
                
                visited_nodes.add(current_node)
                
                # 添加邻居节点
                for neighbor in self.graph.neighbors(current_node):
                    if neighbor not in visited_nodes:
                        queue.append((neighbor, current_hops + 1))
                # 对于无向图或需要反向查找,也需要添加前驱节点
                for predecessor in self.graph.predecessors(current_node):
                     if predecessor not in visited_nodes:
                        queue.append((predecessor, current_hops + 1))
            
            # 构建子图信息
            subgraph_nx = self.graph.subgraph(visited_nodes)
            
            nodes_info = []
            for node_name, data in subgraph_nx.nodes(data=True):
                nodes_info.append({"name": node_name, **data})
            
            edges_info = []
            for u, v, data in subgraph_nx.edges(data=True):
                edges_info.append((u, data.get("relation", "RELATED"), v))
            
            return {
                "nodes": nodes_info,
                "edges": edges_info
            }

代码解析:

  • KnowledgeGraph: 核心是存储和查询知识图谱。

    • 它支持两种后端:

      • networkx (默认): 一个用于创建、操作和研究复杂网络结构的 Python 包,数据存储在内存中,适合小规模或原型。

      • py2neo (集成 Neo4j): 一个用于与 Neo4j 图数据库交互的库。Neo4j 是一个流行的原生图数据库,适合处理大规模、复杂的关系数据,并支持高效的图查询语言 Cypher。

    • add_entityadd_relation 方法用于向图中添加节点和边。

    • build_from_documents 方法利用 EntityRelationExtractor 从一批文档中抽取三元组,并将其添加到知识图谱中。

    • find_subgraph 方法是 Graph RAG 的关键之一,它根据给定的实体和跳数,从知识图谱中检索相关的局部子图。对于 networkx,它使用 BFS (广度优先搜索) 来查找;对于 Neo4j,它执行 Cypher 查询。

2.4 图神经网络推理 (GraphReasoningNet, GraphEmbedder)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv # 这是一个流行的图神经网络层
import numpy as np
from typing import List, Dict, Any, Tuple

# 确保安装了 torch_geometric:
# pip install torch_geometric
# 同时需要安装相应的 torch_scatter, torch_sparse, torch_cluster, torch_spline_conv
# 可以参考 PyTorch Geometric 官方安装指南:https://pytorch-geometric.readthedocs.io/en/latest/install/installation.html

class GraphReasoningNet(nn.Module):
    """一个简单的图卷积神经网络 (GCN) 用于学习图表示"""
    def __init__(self, input_dim: int, hidden_dim: int, output_dim: int):
        super().__init__()
        # 两层 GCN 卷积层
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, output_dim)
        # 注意力机制层,用于聚合节点特征到图级别表示
        self.attention = nn.Linear(output_dim, 1) 
    
    def forward(self, x: torch.Tensor, edge_index: torch.Tensor) -> torch.Tensor:
        """
        x: 节点特征矩阵 (num_nodes, input_dim)
        edge_index: 边索引矩阵 (2, num_edges)
        """
        # 第一层 GCN,ReLU激活
        x = F.relu(self.conv1(x, edge_index))
        # 第二层 GCN
        x = self.conv2(x, edge_index)
        
        # 图级表示:使用注意力机制聚合所有节点特征
        # attention_weights 形状: (num_nodes, 1)
        attention_weights = F.softmax(self.attention(x), dim=0) 
        # 加权求和得到图嵌入
        graph_emb = torch.sum(attention_weights * x, dim=0, keepdim=True)
        
        return graph_emb # (1, output_dim)

class GraphEmbedder:
    def __init__(self, embedding_dim: int = 128, gnn_output_dim: int = 32):
        self.embedding_dim = embedding_dim # 初始实体嵌入维度
        self.gnn_output_dim = gnn_output_dim # GNN 输出的图嵌入维度
        self.entity_embeddings: Dict[str, np.ndarray] = {} # 存储实体名称到其向量的映射
        
        # 初始化 GNN 模型
        self.gnn = GraphReasoningNet(embedding_dim, embedding_dim // 2, gnn_output_dim)
        # 将 GNN 移动到 GPU (如果可用)
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.gnn.to(self.device)
    
    def _get_entity_embedding(self, entity_name: str) -> np.ndarray:
        """获取或创建实体的初始嵌入。实际应用中可以用预训练的实体嵌入。"""
        if entity_name not in self.entity_embeddings:
            # 简单随机初始化,生产环境应使用预训练的词向量或实体嵌入模型
            self.entity_embeddings[entity_name] = np.random.normal(0, 0.1, self.embedding_dim).astype('float32')
        return self.entity_embeddings[entity_name]
    
    def embed_subgraph(self, subgraph: Dict[str, Any]) -> np.ndarray:
        """
        将子图编码为图级别嵌入。
        subgraph: 包含 'nodes' (节点列表,每个节点有'name'属性) 和 'edges' (边列表,每个边是(subj, rel, obj)元组)。
        """
        nodes = subgraph["nodes"]
        edges = subgraph["edges"]
        
        if not nodes: # 如果子图为空
            return np.zeros(self.gnn_output_dim, dtype='float32')
        
        # 1. 准备节点特征 (x)
        node_names = [node["name"] for node in nodes]
        # 为每个节点获取其初始嵌入
        node_features = [self._get_entity_embedding(name) for name in node_names]
        x = torch.tensor(node_features, dtype=torch.float).to(self.device)
        
        # 2. 准备边索引 (edge_index)
        edge_index_list = []
        node_to_idx = {name: i for i, name in enumerate(node_names)}
        
        for subj, rel, obj in edges:
            # 确保边连接的节点在当前子图的节点列表中
            if subj in node_to_idx and obj in node_to_idx:
                edge_index_list.append([node_to_idx[subj], node_to_idx[obj]])
        
        if edge_index_list:
            edge_index = torch.tensor(edge_index_list, dtype=torch.long).t().contiguous().to(self.device)
        else: # 如果没有边,创建一个空的边索引,但确保形状正确
            edge_index = torch.empty((2, 0), dtype=torch.long).to(self.device)
        
        # 3. 通过 GNN 进行推理并生成图嵌入
        self.gnn.eval() # 设置为评估模式
        with torch.no_grad(): # 不计算梯度
            graph_embedding = self.gnn(x, edge_index)
        
        return graph_embedding.cpu().numpy().flatten() # 移回CPU并展平为一维数组

代码解析:

  • GraphReasoningNet: 这是一个简化的图神经网络 (GNN) 模型。

    • 它使用了 PyTorch Geometric 库中的 GCNConv (图卷积网络) 层。GNN 能够聚合节点邻居的信息,从而学习到节点的上下文信息,并最终生成图的整体表示(图嵌入)。

    • 这里还加入了一个简单的注意力机制 (self.attention),用于从所有节点的学习到的特征中聚合出一个单一的图级别嵌入,代表整个子图的语义信息。

  • GraphEmbedder: 负责为知识图谱中的实体和子图生成嵌入。

    • _get_entity_embedding: 为每个实体提供一个初始向量表示。在实际应用中,这些初始嵌入可以来自预训练的词向量 (如 Word2Vec, GloVe) 或更先进的实体嵌入模型 (如 TransE, ComplEx)。这里为了简化,使用了随机初始化。

    • embed_subgraph: 这是关键方法,它将一个子图的节点特征和边结构输入到 GraphReasoningNet 中,最终得到代表整个子图的低维向量(图嵌入)。这个图嵌入捕获了子图的结构化信息和语义。

2.5 主控制器 (GraphRAG)

from sentence_transformers import SentenceTransformer # 用于查询实体的语义匹配
from openai import OpenAI
from typing import List, Dict, Any, Union, Optional
import numpy as np

class GraphRAG:
    def __init__(self, knowledge_graph: KnowledgeGraph, extractor: EntityRelationExtractor, 
                 embedder: GraphEmbedder, llm_model: str = "gpt-3.5-turbo"):
        self.kg = knowledge_graph # 知识图谱实例
        self.extractor = extractor # 实体关系抽取器实例
        self.embedder = embedder # 图嵌入器实例
        self.client = OpenAI() # OpenAI 客户端
        self.llm_model = llm_model
        
        # 用于查询中的实体链接,特别是当查询中的实体表述不完全一致时
        # 可以用更复杂的实体链接模型,这里使用简单的语义相似度
        self.entity_encoder = SentenceTransformer('all-MiniLM-L6-v2') 
        print("SentenceTransformer model 'all-MiniLM-L6-v2' loaded.")
        
    def build_index(self, documents: List[str]):
        """构建知识图谱"""
        self.kg.build_from_documents(documents, self.extractor)
        
    def _link_query_entities_to_kg(self, query_entities: List[str]) -> List[str]:
        """
        将查询中抽取的实体链接到知识图谱中已有的实体。
        这里使用简单的语义相似度匹配,实际生产中会更复杂(如上下文、消歧)。
        """
        linked_entities = []
        kg_entities = list(self.kg.graph.nodes()) # 获取所有图中的实体
        
        if not kg_entities:
            return []

        # 编码知识图谱中的所有实体
        kg_entity_embeddings = self.entity_encoder.encode(kg_entities)
        
        for q_ent in query_entities:
            q_ent_embedding = self.entity_encoder.encode([q_ent])[0]
            
            # 计算查询实体与知识图谱中所有实体的余弦相似度
            similarities = np.dot(kg_entity_embeddings, q_ent_embedding) / \
                           (np.linalg.norm(kg_entity_embeddings, axis=1) * np.linalg.norm(q_ent_embedding) + 1e-8)
            
            # 找到最相似的实体
            most_similar_idx = np.argmax(similarities)
            if similarities[most_similar_idx] > 0.6: # 设置一个相似度阈值
                linked_entities.append(kg_entities[most_similar_idx])
            else:
                print(f"Warning: Query entity '{q_ent}' not confidently linked to KG.")
        return linked_entities

    def retrieve_subgraphs(self, query: str, max_subgraphs: int = 3, max_hops: int = 2) -> List[Dict[str, Any]]:
        """
        从知识图谱中检索与查询相关的子图。
        max_subgraphs: 最多返回的子图数量。
        max_hops: 每个子图的最大跳数。
        """
        print(f"提取查询实体: {query}")
        query_entities_raw = self.extractor.extract_entities(query)
        query_entity_names = [ent["text"] for ent in query_entities_raw]
        
        print(f"链接查询实体到知识图谱: {query_entity_names}")
        linked_entities = self._link_query_entities_to_kg(query_entity_names)
        
        subgraphs = []
        retrieved_entities = set() # 避免重复检索同一个实体的子图
        
        print(f"从链接实体中检索子图: {linked_entities}")
        for entity in linked_entities:
            if entity not in retrieved_entities:
                subgraph_data = self.kg.find_subgraph(entity, max_hops=max_hops)
                if subgraph_data["nodes"]: # 确保子图不为空
                    subgraphs.append({
                        "center_entity": entity,
                        "subgraph": subgraph_data
                    })
                    retrieved_entities.add(entity)
                    if len(subgraphs) >= max_subgraphs:
                        break # 达到最大子图数量
        
        print(f"检索到 {len(subgraphs)} 个相关子图。")
        return subgraphs
    
    def generate_answer(self, query: str, subgraphs: List[Dict[str, Any]]) -> str:
        """基于检索到的子图和图推理结果生成答案。"""
        if not subgraphs:
            return "未在知识图谱中找到与您问题相关的任何信息。请尝试换个问法。"
        
        # 将子图信息格式化为LLM可理解的上下文
        context_parts = []
        for i, sg_info in enumerate(subgraphs):
            subgraph = sg_info["subgraph"]
            
            nodes_text = [f"实体: {node.get('name', 'N/A')} (类型: {node.get('type', 'Unknown')})" for node in subgraph["nodes"]]
            relations_text = [f"关系: {s} --{r}--> {o}" for s, r, o in subgraph["edges"]]
            
            context_parts.append(f"""
--- 相关知识图谱子图 {i+1} (中心实体: {sg_info['center_entity']}) ---
节点信息:
{chr(10).join(nodes_text)}
关系信息:
{chr(10).join(relations_text)}
------------------------------------------------
""")
        
        context = "\n".join(context_parts)
        
        # 构造给 LLM 的 prompt,明确要求基于图谱信息进行推理并说明过程
        prompt = f"""你是一个基于知识图谱进行复杂关系推理的AI助手。请根据以下提供的知识图谱信息,回答用户的问题。
你的答案必须基于提供的信息,并尽可能详细地说明你的推理过程,引用知识图谱中的实体和关系。如果信息不足,请明确说明。

以下是与问题相关的知识图谱片段:
{context}

用户问题: {query}

请基于上述知识图谱信息,分析实体和关系,然后给出答案和清晰的推理路径:
"""
        
        print("正在调用大语言模型生成答案...")
        try:
            response = self.client.chat.completions.create(
                model=self.llm_model,
                messages=[
                    {"role": "system", "content": "你是一个基于知识图谱进行推理的AI助手。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.1, # 较低的温度,鼓励模型忠实于提供的上下文
                max_tokens=1000
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error calling LLM: {e}")
            return "抱歉,在生成答案时遇到了问题,请稍后再试或检查API配置。"
    
    def query(self, question: str, max_subgraphs: int = 3, max_hops: int = 2) -> Dict[str, Any]:
        """
        主查询接口,整合整个 Graph RAG 流程。
        question: 用户查询的文本。
        max_subgraphs: 检索的最大子图数量。
        max_hops: 每个子图的最大跳数。
        """
        print(f"\n--- 接收到查询: '{question}' ---")
        
        # 检索相关子图
        subgraphs = self.retrieve_subgraphs(question, max_subgraphs, max_hops)
        
        # 如果需要图嵌入推理,可以在这里进行 (本示例中 GNN 嵌入用于提供上下文,而非直接检索)
        # 例如,可以计算查询嵌入与子图嵌入的相似度,用于 rerank 子图。
        # subgraph_embeddings = self.embedder.batch_embed_subgraphs([sg['subgraph'] for sg in subgraphs])
        
        # 生成答案
        answer = self.generate_answer(question, subgraphs)
        
        return {
            "answer": answer,
            "reasoning_subgraphs": subgraphs, # 返回用于推理的子图,以便调试和可解释性
            "num_subgraphs_retrieved": len(subgraphs)
        }

代码解析:

  • GraphRAG: 这是整个 Graph RAG 系统的核心调度器。

    • __init__: 初始化知识图谱、抽取器、图嵌入器和 OpenAI LLM 客户端。它还引入了 SentenceTransformer 用于实体链接,通过计算查询实体与知识图谱中现有实体的语义相似度来找到最匹配的实体。

    • build_index: 简单地调用 knowledge_graph.build_from_documents 来构建知识图谱。

    • _link_query_entities_to_kg: 这是一个关键的辅助方法,用于将用户查询中识别的实体(可能只是一个文本片段)精确地匹配到知识图谱中已有的标准化实体节点上。这有助于确保后续的子图检索的准确性。

    • retrieve_subgraphs: 这是 RAG 中的“检索”部分,但在这里,它不再是简单的文本块检索,而是通过识别查询中的实体,并调用 knowledge_graph.find_subgraph 来获取相关的结构化子图

    • generate_answer: 这是 RAG 中的“生成”部分。它将检索到的子图信息(以易于 LLM 理解的文本形式呈现,包含节点和关系)作为上下文,与原始查询一起发送给大语言模型。LLM 借助这些结构化的上下文信息,进行逻辑推理并生成答案,并要求 LLM 说明推理过程,增强可解释性。

    • query: 整个系统的入口点,协调上述所有步骤。


3. 使用示例:让你的 Graph RAG 动起来

为了运行以下示例,你需要做一些准备:

  1. 安装必要的库:

    pip install spacy networkx py2neo torch torch-geometric openai sentence-transformers numpy
    # 安装 spaCy 模型(重要!)
    python -m spacy download en_core_web_sm
    
  2. 设置 OpenAI API Key: 确保你的 OPENAI_API_KEY 环境变量已配置,因为 GraphRAG 使用 OpenAI API 进行答案生成。

import os
from graph_rag_tutorial import EntityRelationExtractor, KnowledgeGraph, GraphEmbedder, GraphRAG
# 确保导入了所有必要的类

# 设置 OpenAI API Key (请替换为你的真实 Key)
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

# 初始化组件
extractor = EntityRelationExtractor()
# kg = KnowledgeGraph(use_neo4j=False) # 使用 NetworkX 作为内存图谱
kg = KnowledgeGraph(use_neo4j=False) # 暂时禁用Neo4j以确保示例可在无Neo4j环境运行

embedder = GraphEmbedder()
graph_rag = GraphRAG(kg, extractor, embedder)

# 知识文档
documents = [
    "史蒂夫·乔布斯于1976年与史蒂夫·沃兹尼亚克共同创立了苹果公司。",
    "蒂姆·库克在2011年接替史蒂夫·乔布斯成为苹果公司的CEO。",
    "苹果公司总部位于加利福尼亚州的库比蒂诺。",
    "iPhone是苹果公司的主要产品,由富士康负责制造。",
    "富士康是一家总部位于台湾的跨国电子产品制造商。",
    "史蒂夫·乔布斯还创立了NeXT电脑公司,该公司后来被苹果公司收购。",
    "苹果公司设计了macOS操作系统,用于其Mac系列电脑。"
]

# 构建知识图谱
print("--- 开始构建知识图谱 ---")
graph_rag.build_index(documents)
print("--- 知识图谱构建完成 ---")


# --- 查询示例 ---

# 1. 简单查询:直接关系
print("\n=== 查询 1: 简单关系 ===")
query_1 = "谁是苹果公司的CEO?"
result_1 = graph_rag.query(query_1)
print(f"**问题**: {query_1}")
print(f"**答案**: {result_1['answer']}")
print(f"**检索到的子图数量**: {result_1['num_subgraphs_retrieved']}")
# print(f"**用于推理的子图**: {result_1['reasoning_subgraphs']}") # 打印详细子图信息 (可选)


# 2. 多跳推理:CEO的前任
print("\n=== 查询 2: 多跳推理 ===")
query_2 = "蒂姆·库克的前任是谁?"
result_2 = graph_rag.query(query_2)
print(f"**问题**: {query_2}")
print(f"**答案**: {result_2['answer']}")
print(f"**检索到的子图数量**: {result_2['num_subgraphs_retrieved']}")


# 3. 多跳推理:产品制造商的总部
print("\n=== 查询 3: 多跳推理 (跨实体) ===")
query_3 = "为iPhone制造产品的公司总部在哪里?"
result_3 = graph_rag.query(query_3)
print(f"**问题**: {query_3}")
print(f"**答案**: {result_3['answer']}")
print(f"**检索到的子图数量**: {result_3['num_subgraphs_retrieved']}")


# 4. 未知信息查询
print("\n=== 查询 4: 未知信息 ===")
query_4 = "苹果公司有多少名员工?"
result_4 = graph_rag.query(query_4)
print(f"**问题**: {query_4}")
print(f"**答案**: {result_4['answer']}")
print(f"**检索到的子图数量**: {result_4['num_subgraphs_retrieved']}")

# 5. 关于一个实体的多方面信息
print("\n=== 查询 5: 多方面信息 ===")
query_5 = "告诉我关于史蒂夫·乔布斯的信息。"
result_5 = graph_rag.query(query_5)
print(f"**问题**: {query_5}")
print(f"**答案**: {result_5['answer']}")
print(f"**检索到的子图数量**: {result_5['num_subgraphs_retrieved']}")


4. 配置管理:灵活调整你的 Graph RAG

我们可以使用 YAML 文件来集中管理 Graph RAG 系统的各种配置参数,这使得系统更易于配置和维护。

# graph_rag_config.yaml
graph_rag:
  knowledge_graph:
    backend: "networkx"  # 或 "neo4j",选择知识图谱存储后端
    neo4j_uri: "bolt://localhost:7687" # Neo4j 连接URI
    neo4j_auth_user: "neo4j"
    neo4j_auth_password: "password"
    max_hops: 2 # 检索子图的最大跳数
  
  extraction:
    spacy_model: "en_core_web_sm" # SpaCy 实体关系抽取模型
    
  embedding:
    entity_embedding_dim: 128 # 初始实体嵌入维度
    graph_embedding_dim: 32 # GNN 输出的图嵌入维度
    
  retrieval:
    max_subgraphs: 3 # 检索的最大子图数量
    entity_linking_threshold: 0.6 # 实体链接的语义相似度阈值

  generation:
    llm_model: "gpt-3.5-turbo" # 大语言模型
    temperature: 0.1 # LLM 生成的随机性
    max_tokens: 1000 # LLM 生成的最大 token 数

5. 性能优化:让 Graph RAG 运行更高效

Graph RAG 的性能瓶颈通常在于知识图谱的构建(实体关系抽取)、子图检索和 GNN 推理。

5.1 子图缓存(Subgraph Caching)

对于频繁查询的实体或子图,可以将检索结果缓存起来,避免重复计算。

from functools import lru_cache
from typing import List, Dict, Any, Tuple

class CachedGraphRAG(GraphRAG):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 将 find_subgraph 方法包装为带缓存的版本
        # 注意: lru_cache 只能缓存可哈希的参数,所以这里直接缓存kg实例的find_subgraph方法
        # 更好的做法是在KnowledgeGraph类中实现_cached_find_subgraph方法
        # 为了演示,这里直接在retrieve_subgraphs中调用缓存版本
        self._cached_find_subgraph_internal = lru_cache(maxsize=1024)(self.kg.find_subgraph)
    
    def retrieve_subgraphs(self, query: str, max_subgraphs: int = 3, max_hops: int = 2) -> List[Dict[str, Any]]:
        """
        检索相关子图,使用缓存机制。
        """
        query_entities_raw = self.extractor.extract_entities(query)
        query_entity_names = [ent["text"] for ent in query_entities_raw]
        linked_entities = self._link_query_entities_to_kg(query_entity_names)
        
        subgraphs = []
        retrieved_entities = set()
        
        for entity in linked_entities:
            if entity not in retrieved_entities:
                # 使用缓存的 find_subgraph 方法
                subgraph_data = self._cached_find_subgraph_internal(entity, max_hops) 
                if subgraph_data["nodes"]:
                    subgraphs.append({
                        "center_entity": entity,
                        "subgraph": subgraph_data
                    })
                    retrieved_entities.add(entity)
                    if len(subgraphs) >= max_subgraphs:
                        break
        return subgraphs

代码解析:

  • @lru_cache: Python 内置的装饰器,用于实现 LRU (Least Recently Used) 缓存策略。它会自动缓存函数的输入和输出。

  • 通过将 kg.find_subgraph 方法包装在 lru_cache 中,当相同的实体和跳数作为查询参数再次出现时,系统会直接从缓存中返回结果,避免重复进行图遍历操作,从而提高检索效率。

5.2 批量图处理(Batch Graph Processing)

在 GNN 推理阶段,如果需要对多个子图进行嵌入,可以考虑批量处理以提高 GPU 利用率。

import torch
import numpy as np
from typing import List, Dict, Any

def batch_embed_subgraphs(subgraphs_list: List[Dict[str, Any]], embedder: GraphEmbedder) -> np.ndarray:
    """
    批量处理多个子图,生成它们的图嵌入。
    subgraphs_list: 包含多个子图数据(每个子图是find_subgraph返回的字典)的列表。
    embedder: GraphEmbedder 实例。
    返回: 多个子图的嵌入组成的 NumPy 数组。
    """
    embeddings = []
    
    # 收集所有节点的特征和边信息,构建一个大的批处理图(如果有需要)
    # 或者简单地遍历每个子图并进行嵌入(如下所示,每次调用GNN)
    
    print(f"Batch embedding {len(subgraphs_list)} subgraphs...")
    for sg_data in subgraphs_list:
        # 确保传入的是原始的 subgraph data,而不是整个 sg_info 字典
        embedding = embedder.embed_subgraph(sg_data)
        embeddings.append(embedding)
    
    if embeddings:
        return np.array(embeddings)
    else:
        return np.array([]) # 返回空数组如果没有任何子图

代码解析:

  • batch_embed_subgraphs: 这个函数接受一个子图列表,并遍历每个子图,调用 embedder.embed_subgraph 来获取其嵌入。虽然这里仍然是循环调用单个子图嵌入,但如果底层 GraphEmbedder 的 GNN 支持更大的批处理(例如,将多个小图构建成一个大图并一次性送入 GNN),则效率会更高。


6. 应用场景:Graph RAG 的独特价值

Graph RAG 在需要深度理解实体间关系复杂推理的场景中,展现出传统 RAG 无法比拟的优势:

  • 医疗健康:

    • 疾病诊断辅助: 查询“某种药物与哪些副作用相关,这些副作用又与哪些疾病相关?”进行多跳推理,帮助医生理解复杂的病理生理和药物影响。

    • 个性化治疗方案: 基于患者的基因数据、病史、药物过敏等实体,检索相关药物、治疗方法,并推理最佳方案。

  • 金融风控:

    • 欺诈检测: 分析账户、交易、IP 地址之间的复杂关系网络,识别异常模式和潜在的欺诈团伙。

    • 投资分析: 关联公司、行业、市场、高管、新闻事件等信息,进行多维度分析和风险评估。

  • 法律领域:

    • 案例分析: 关联法条、判例、当事人、罪名等实体,推理案件的适用法律和潜在判决结果。

    • 合同审查: 识别合同中的关键条款、权利义务、实体,并发现潜在的冲突或风险。

  • 科研探索与知识发现:

    • 文献综述: 从大量论文中抽取概念、方法、结果、作者等实体及它们之间的关系,快速构建某个领域的知识图谱,辅助科研人员发现新的关联和研究方向。

    • 药物发现: 探索化合物、靶点、疾病之间的潜在关联,加速新药研发。

  • 智能客服/问答系统: 处理如“谁是 XXX 的父亲?他的职业是什么?他发明了什么?”这类需要多步关联和推理的问题。


7. 评估指标:衡量 Graph RAG 的效果

评估 Graph RAG 的性能比传统 RAG 更复杂,因为它涉及知识图谱的质量、推理能力和最终答案的生成。

指标

说明

目标值

推理准确率

多跳推理答案的正确性。

>85%

路径完整性

系统能否提供完整、可信的推理路径。

>80%

子图相关性

检索到的子图与用户查询的关联程度。

>75%

响应延迟

端到端查询时间,包括实体抽取、图谱查询等。

<800ms

知识图谱覆盖率

知识图谱对领域知识的覆盖程度。

实体链接准确率

查询实体与知识图谱实体匹配的准确性。

>90%


8. 部署建议:将 Graph RAG 投入生产

部署 Graph RAG 系统需要额外考虑图数据库的部署和维护。

8.1 Docker 部署 (带 Neo4j)

使用 docker-compose 是部署包含 Neo4j 图数据库的 Graph RAG 系统的便捷方式。

# Dockerfile (位于你的 Graph RAG 项目根目录)
FROM python:3.9

# 安装系统依赖(如 Neo4j 的 Java 运行时如果需要,或 SpaCy 的语言模型依赖)
# 通常 spaCy 的 en_core_web_sm 模型下载后不需要额外系统依赖。
# 如果使用 Neo4j Python驱动,它底层依赖 bolt 协议,不需要Java。

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 下载 spaCy 语言模型(重要!)
RUN python -m spacy download en_core_web_sm

# 复制你的应用代码
COPY . /app
WORKDIR /app

# 暴露你的应用服务端口 (如果你的 Graph RAG 暴露为 API)
EXPOSE 8000

# 启动命令 (示例,根据你的实际应用入口调整)
CMD ["python", "app.py"]
# docker-compose.yml (与 Dockerfile 同级)
version: '3.8'

services:
  neo4j:
    image: neo4j:4.4 # 使用 Neo4j 官方镜像
    container_name: neo4j-graph-db
    ports:
      - "7474:7474" # Neo4j Browser 端口
      - "7687:7687" # Bolt 协议端口,供 py2neo 连接
    environment:
      - NEO4J_AUTH=neo4j/password # 替换为你的密码
      - NEO4J_HEAP_SIZE=1G # 根据需要调整内存
      # 更多 Neo4j 配置可以参考官方文档
    volumes:
      - ./neo4j_data:/data # 持久化数据

  graph-rag-app:
    build: . # 根据当前目录的 Dockerfile 构建
    container_name: graph-rag-service
    depends_on:
      - neo4j # 确保 Neo4j 在 Graph RAG 应用启动前运行
    ports:
      - "8000:8000" # 如果你的 Graph RAG 服务运行在 8000 端口
    environment:
      # 传递 Neo4j 连接信息给你的 Graph RAG 应用
      - NEO4J_URI=bolt://neo4j:7687
      - NEO4J_USER=neo4j
      - NEO4J_PASSWORD=password
      - OPENAI_API_KEY=${OPENAI_API_KEY} # 从宿主机环境变量获取
    # 如果你的应用需要访问本地文件(如日志、配置),可以挂载卷
    volumes:
      - ./app:/app # 将本地 app 目录挂载到容器中

部署要点:

  • 图数据库: 对于大规模知识图谱,强烈推荐使用专业的图数据库(如 Neo4j, ArangoDB, Amazon Neptune 等),它们提供高效的存储、查询和扩展能力。

  • 资源分配: 图数据库和 GNN 都在计算上密集。为它们分配足够的 CPU、内存和 GPU 资源至关重要。

  • 数据持久化: 确保图数据库的数据卷被正确挂载,以防止容器重启导致数据丢失。


9. 总结:Graph RAG 的强大与挑战

Graph RAG 是 RAG 领域迈向更高智能的关键一步。它通过将非结构化文本转化为结构化的知识图谱,从而让 RAG 系统具备了强大的逻辑推理多跳问答能力。

核心优势:

  • 支持多跳逻辑推理: 能够回答需要跨越多个事实和关系的复杂问题。

  • 提供可解释的推理路径: 由于知识以图的形式组织,系统可以清晰地展示其如何得出答案,增强了透明度和用户信任。

  • 处理复杂关系查询: 更好地理解实体间的深层语义关系,超越简单的关键词匹配。

  • 知识结构化存储: 有利于知识的长期管理、更新和一致性维护。

面临的挑战:

  • 知识图谱构建成本高: 从非结构化文本中自动、准确地抽取实体和关系是一个复杂且耗时的过程,需要高质量的抽取模型和大量标注数据。

  • 实体关系抽取准确性: 抽取的质量直接影响知识图谱的准确性和推理能力。

  • 图数据库维护复杂性: 部署、管理和扩展图数据库需要专业的知识和资源。

  • 计算复杂度较高: 图遍历、GNN 推理等操作可能带来较高的计算开销和延迟。

  • 知识图谱动态更新: 随着新信息的出现,如何高效、准确地更新和维护知识图谱是一个持续的挑战。

适用场景:

  • 需要逻辑推理的领域: 如医疗、金融、法律、科研等。

  • 关系密集的知识体系: 例如社交网络分析、供应链管理等。

  • 需要解释性的应用: 对 AI 决策过程的透明度有高要求的场景。

  • 结构化数据丰富的场景: 尽管 Graph RAG 主要从非结构化文本构建图谱,但如果已有部分结构化数据,可以更好地融合。


预告:RAG 进阶之旅,更高层级的智能...

我们已经深入探讨了 Naive RAG、Retrieve-and-Rerank RAG、Multimodal RAG,以及今天的 Graph RAG。我们的 RAG 智能系统正在不断进化,变得越来越强大!

接下来,我们将探索如何让 RAG 能够处理更复杂的知识结构,进行更深层次的推理:

  • Hybrid RAG (混合 RAG):如何更智能地融合多种检索策略,不仅仅是向量和关键词,还包括更高级的路由决策和结果融合算法,应对企业级多源数据的挑战?

  • Agentic RAG Router (智能体路由):如何引入 LLM 驱动的智能代理,让 RAG 系统能够根据用户意图,动态选择和协调不同的工具或 RAG 流程,实现自适应的问答?

  • Agentic RAG Multi-Agent (多智能体):如何构建一个多专家协作的 RAG 系统,让不同的专业智能体分工合作,甚至通过“辩论”来达成共识,处理最复杂的查询?

每一次迭代,RAG 都离真正的通用人工智能更近一步。你准备好迎接这些更高级的挑战了吗?


持续学习,才能玩转AI!

这篇RAG入门教程助你启程。想获取:

  • 最新AI架构趋势深度解读

  • RAG、MCP及LLM应用实战教程与代码

  • 精选学习资源与高效工具包

  • 技术答疑与同行交流

👉 欢迎关注 【AI架构笔记】!

扫码 / 搜一搜:AI架构笔记,一起进阶,驾驭AI!

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐