在 Text2SQL 系统中,一个常见但致命的问题是:大模型面对几十甚至上百张表时,容易“看花眼”,生成错误的 JOIN 或字段引用
本文将带你拆解一套基于 BM25 语义匹配 + 中文分词的表过滤机制,让大模型只关注与用户问题最相关的几张表,大幅提升 SQL 生成准确率!

💡 核心思想
将用户自然语言查询 → 中文分词 → 与每张表的“语义文档”(表名+注释+字段注释)进行 BM25 相似度打分 → 动态筛选 Top-K 相关表 → 仅将这些表的 Schema 提供给大模型。

🧭 本文流程一览

✅ 环境依赖与全局配置 —— 分词、缓存、连接池
✅ 表结构自动提取 —— 从数据库动态获取字段、注释、外键
✅ 文本分词与文档构建 —— 为每张表构建可检索的语义文档
✅ BM25 相关性打分 —— 计算用户查询与每张表的匹配度
✅ 动态阈值过滤 —— 智能保留 Top 相关表(最多5张)
✅ 集成到 Agent State —— 无缝对接大模型推理流程


1️⃣ 环境依赖与全局配置

首先,我们导入必要的库,并初始化全局资源。

import json
import warnings

# 忽略 jieba 的 UserWarning(如未加载词典提示)
warnings.filterwarnings("ignore", category=UserWarning, module="jieba")

import logging
import re
from typing import Dict, List, Any, Optional
import jieba  # 中文分词库:https://github.com/fxsjy/jieba
import pandas as pd
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.expression import text
from rank_bm25 import BM25Okapi  # 轻量级 BM25 实现
from agent.text2sql.state.agent_state import AgentState, ExecutionResult
from model.db_connection_pool import get_db_pool
from functools import lru_cache  # 用于缓存 schema

# 全局数据库连接池(避免重复创建连接)
db_pool = get_db_pool()
logger = logging.getLogger(__name__)

🔍 说明:

  • 使用 jieba 进行中英文混合分词,支持中文语义理解。

  • rank_bm25 是一个高效、无依赖的 BM25 实现,适合轻量级语义匹配。

  • lru_cache 缓存整个数据库 schema,避免重复扫描(适用于 schema 不频繁变更的场景)。

2️⃣ 表结构自动提取

我们通过 SQLAlchemy 的 inspect 工具,自动获取所有表的元数据。

class DatabaseService:
    def __init__(self):
        self._engine = db_pool.get_engine()

    @lru_cache(maxsize=1)
    def _fetch_all_table_info(self) -> Dict[str, Dict]:
        """
        从数据库获取所有表的结构信息(带缓存)。
        """
        inspector = inspect(self._engine)
        table_info = {}

        for table_name in inspector.get_table_names():
            try:
                # 获取列信息:列名、类型、注释
                columns = {}
                for col in inspector.get_columns(table_name):
                    comment = str(col["comment"] or "")
                    columns[col["name"]] = {
                        "type": str(col["type"]),
                        "comment": comment,
                    }

                # 获取外键关系(用于后续图谱构建)
                foreign_keys = [
                    f"{fk['constrained_columns'][0]} -> {fk['referred_table']}.{fk['referred_columns'][0]}"
                    for fk in inspector.get_foreign_keys(table_name)
                ]

                # 单独获取表注释(MySQL 存储在 information_schema)
                table_comment = self._get_table_comment(table_name)

                table_info[table_name] = {
                    "columns": columns,
                    "foreign_keys": foreign_keys,
                    "table_comment": table_comment,
                }
            except Exception as e:
                logger.warning(f"读取表 {table_name} 结构失败: {e}")

        logger.info(f"成功加载 {len(table_info)} 张表的 schema 信息")
        return table_info

    def _get_table_comment(self, table_name: str) -> str:
        """
        从 information_schema 获取 MySQL 表注释。
        """
        try:
            query = text(
                """
                SELECT table_comment 
                FROM information_schema.tables 
                WHERE table_schema = DATABASE() 
                  AND table_name = :table_name;
            """
            )
            with self._engine.connect() as conn:
                result = conn.execute(query, {"table_name": table_name})
                row = result.fetchone()
                return row[0].strip() if row else""
        except Exception as e:
            logger.warning(f"无法获取表 {table_name} 的注释: {e}")
            return""

✅ 优势:

  • 自动识别字段注释(如“客户ID”、“订单创建时间”),这些是语义匹配的关键信号。

  • 外键信息虽未在过滤中直接使用,但可用于后续 Neo4j 图谱构建(参考前文)

3️⃣ 文本分词与文档构建

为每张表构建一个“语义文档”,包含表名、表注释、所有字段名和字段注释。

@staticmethod
def _tokenize_text(text_str: str) -> List[str]:
    """
    对文本进行中文/英文分词,过滤标点符号。
    """
    # 保留中文、英文、数字,其余替换为空格
    filtered_text = re.sub(r"[^\u4e00-\u9fa5a-zA-Z0-9]", " ", text_str)
    tokens = jieba.lcut(filtered_text, cut_all=False)  # 精确模式
    return [token.strip() for token in tokens if token.strip()]

@staticmethod
def _build_document(table_name: str, table_info: dict) -> str:
    """
    构建用于 BM25 匹配的文档文本。
    """
    parts = [table_name]

    # 添加表注释(如“客户信息表”)
    table_comment = table_info.get("table_comment", "")
    if table_comment:
        parts.append(table_comment)

    # 添加每个字段的名称和注释
    for col_name, col_info in table_info.get("columns", {}).items():
        parts.append(col_name)
        parts.append(col_info.get("comment", ""))

    return" ".join(parts)

🌰 示例: 表 t_customers 的文档可能为: "t_customers 客户信息表 customer_id 客户ID customer_name 客户姓名 phone 联系电话 email 电子邮箱 ..."

4️⃣ BM25 相关性打分与动态提权

使用 BM25 计算用户查询与每张表的匹配得分,并对“表注释命中关键词”的表进行提权。

def _filter_relevant_tables_by_bm25(self, table_info: Dict[str, Dict], user_query: str) -> Dict[str, Dict]:
    if not user_query or not table_info:
        return table_info

    # 构建语料库
    corpus = []
    table_names = []
    table_comments = []
    for table_name, info in table_info.items():
        doc = self._build_document(table_name, info)
        corpus.append(doc)
        table_names.append(table_name)
        table_comments.append(info.get("table_comment", ""))

    # 分词
    tokenized_corpus = [self._tokenize_text(doc) for doc in corpus]
    query_tokens = self._tokenize_text(user_query)
    query_token_set = set(query_tokens)

    # BM25 打分
    bm25 = BM25Okapi(tokenized_corpus)
    doc_scores = bm25.get_scores(query_tokens)

    # 动态提权:若表注释包含查询关键词,则提升得分
    enhanced_scores = doc_scores.copy()
    for i, (comment, score) in enumerate(zip(table_comments, doc_scores)):
        if score <= 0:
            continue
        comment_tokens = self._tokenize_text(comment)
        comment_token_set = set(comment_tokens)
        overlap = query_token_set & comment_token_set
        if overlap:
            overlap_ratio = len(overlap) / len(query_token_set)
            enhanced_scores[i] += score * overlap_ratio * 1.5  # 提权 1.5 倍

    # 排序并打印 Top5
    scored_tables = sorted(
        [(idx, score) for idx, score in enumerate(enhanced_scores)],
        key=lambda x: x[1],
        reverse=True,
    )

    print(f"\n🔍 查询: {user_query}")
    for idx, score in scored_tables[:5]:
        table_name = table_names[idx]
        print(f" ✅ {table_name:12} | 得分: {score:.4f}")

    # 动态阈值过滤
    if not scored_tables:
        return {}
    max_score = scored_tables[0][1]
    threshold = max(max_score * 0.1, 0.5)  # 至少保留得分 > 0.5 或 > 最高分 10%
    top_indices = [i for i, s in scored_tables if s >= threshold][:5]

    filtered_table_info = {table_names[idx]: table_info[table_names[idx]] for idx in top_indices}
    logger.info(f"BM25 筛选出 {len(filtered_table_info)} 个相关表: {list(filtered_table_info.keys())}")
    return filtered_table_info

📊 过滤策略亮点:

  • 动态阈值:避免硬编码 TopK,适应不同查询复杂度。

  • 注释提权:表注释通常包含高层业务语义(如“销售订单”),命中即高相关。

  • 最多返回5张表:平衡上下文长度与召回率。

5️⃣ 集成到 Agent State(供大模型使用)

最后,将过滤后的表结构写入 AgentState,供后续 SQL 生成模块使用。

def get_table_schema(self, state: AgentState) -> AgentState:
    """
    获取数据库表结构,并根据用户查询筛选相关表。
    """
    try:
        logger.info("开始获取数据库表 schema 信息")
        all_table_info = self._fetch_all_table_info()
        user_query = state.get("user_query", "").strip()

        if user_query:
            filtered_info = self._filter_relevant_tables_by_bm25(all_table_info, user_query)
            state["db_info"] = filtered_info
        else:
            state["db_info"] = all_table_info

        logger.info(f"获取数据库表信息成功,共 {len(state['db_info'])} 张表")
    except Exception as e:
        logger.error(f"获取数据库表信息失败: {e}", exc_info=True)
        state["db_info"] = {}
        state["execution_result"] = ExecutionResult(success=False, error="无法连接数据库或获取元数据")

    return state

🎯 过滤效果

用户问:“查看最近一周新增的用户数量”

🔍 查询: 查看最近一周新增的用户数量
 ✅ users        | 得分: 2.8741
 ✅ user_log     | 得分: 1.9320
 ✅ audit_record | 得分: 0.7654
 ✅ orders       | 得分: 0.3210
 ✅ products     | 得分: 0.1023

最终只保留前 3 张表(得分 ≥ 0.5 且 ≥ 2.8741×0.1=0.287),orders 和 products 被过滤掉。

✅ 结果:SQL 生成模块仅看到 users、user_log、audit_record,大幅降低干扰。

🎁 总结

本文实现了一套轻量、高效的 Text2SQL 表过滤机制,核心优势:

  • ✅ 中文友好:基于 jieba 分词,天然支持中文业务术语

  • ✅ 语义感知:利用字段/表注释,而非仅表名

  • ✅ 动态适应:根据查询内容自动调整返回表数量

  • ✅ 低开销:BM25 计算快,适合实时推理

🔜 下一步动作:

  • 将过滤结果与 Neo4j 语义图谱 结合,提供“关系路径”给大模型 已实现

  • 引入 Embedding + 向量检索 作为 BM25 的补充(处理同义词、泛化)

📚 完整代码

参考我的开源项目:git@github.com:apconw/sanic-web.git

🌈 项目亮点

  • ✅ 集成 MCP 多智能体架构

  • ✅ 支持 Dify / LangChain / LlamaIndex / Ollama / vLLM / Neo4j

  • ✅ 前端采用 Vue3 + TypeScript + Vite5,现代化交互体验

  • ✅ 内置 ECharts / AntV 图表问答 + CSV 表格问答

  • ✅ 支持对接主流 RAG 系统 与 Text2SQL 引擎

  • ✅ 轻量级 Sanic 后端,适合快速部署与二次开发

运行效果:

数据问答

数据问答

数据问答

 AI大模型从0到精通全套学习大礼包

我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。

只要你是真心想学AI大模型,我这份资料就可以无偿共享给你学习。大模型行业确实也需要更多的有志之士加入进来,我也真心希望帮助大家学好这门技术,如果日后有什么学习上的问题,欢迎找我交流,有技术上面的问题,我是很愿意去帮助大家的!

如果你也想通过学大模型技术去帮助就业和转行,可以点扫描下方👇👇
大模型重磅福利:入门进阶全套104G学习资源包免费分享!
在这里插入图片描述

01.从入门到精通的全套视频教程

包含提示词工程、RAG、Agent等技术点
在这里插入图片描述

02.AI大模型学习路线图(还有视频解说)

全过程AI大模型学习路线

在这里插入图片描述

​​在这里插入图片描述

03.学习电子书籍和技术文档

市面上的大模型书籍确实太多了,这些是我精选出来的

在这里插入图片描述

04.大模型面试题目详解

在这里插入图片描述

在这里插入图片描述

05.这些资料真的有用吗?

这份资料由我和鲁为民博士共同整理,鲁为民博士先后获得了北京清华大学学士和美国加州理工学院博士学位,在包括IEEE Transactions等学术期刊和诸多国际会议上发表了超过50篇学术论文、取得了多项美国和中国发明专利,同时还斩获了吴文俊人工智能科学技术奖。目前我正在和鲁博士共同进行人工智能的研究。

所有的视频由智泊AI老师录制,且资料与智泊AI共享,相互补充。这份学习大礼包应该算是现在最全面的大模型学习资料了。

资料内容涵盖了从入门到进阶的各类视频教程和实战项目,无论你是小白还是有些技术基础的,这份资料都绝对能帮助你提升薪资待遇,转行大模型岗位。

在这里插入图片描述
在这里插入图片描述

智泊AI始终秉持着“让每个人平等享受到优质教育资源”的育人理念‌,通过动态追踪大模型开发、数据标注伦理等前沿技术趋势‌,构建起"前沿课程+智能实训+精准就业"的高效培养体系。

课堂上不光教理论,还带着学员做了十多个真实项目。学员要亲自上手搞数据清洗、模型调优这些硬核操作,把课本知识变成真本事‌!

在这里插入图片描述
如果说你是以下人群中的其中一类,都可以来智泊AI学习人工智能,找到高薪工作,一次小小的“投资”换来的是终身受益!

应届毕业生‌:无工作经验但想要系统学习AI大模型技术,期待通过实战项目掌握核心技术。

零基础转型‌:非技术背景但关注AI应用场景,计划通过低代码工具实现“AI+行业”跨界‌。

业务赋能 ‌突破瓶颈:传统开发者(Java/前端等)学习Transformer架构与LangChain框架,向AI全栈工程师转型‌。

👉获取方式:
😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费】🆓

在这里插入图片描述

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐