SQLBot通过构建三层语义检索RAG架构(数据源→表→业务知识),有效解决Text2SQL场景下的表结构复杂、业务术语理解偏差等问题。采用向量预计算+异步更新策略实现高性能检索,结合术语库和SQL示例增强业务语义理解。该方案不仅提高Text2SQL准确率,还可推广到API文档检索、代码补全等需要精准检索结构化知识的场景,具有高准确、高性能、易扩展等优势。


背景

在Text2SQL领域,如何让大模型准确理解用户意图并生成正确的SQL查询,一直是个挑战。传统方法往往面临表结构复杂、业务术语理解偏差、SQL示例缺失等问题。今天来学习一下SOLBot开源项目的实现, 本文基于开源项目SQLBot的源码实现,深入剖析其在Text2SQL场景下的语义检索RAG(Retrieval-Augmented Generation)技术方案,看看工程实践中如何解决这些痛点。

unsetunset👉 欢迎关注我的开源项目unsetunset

🎬 运行效果预览

📌 如果您这个项目感兴趣,欢迎「付费点赞」支持一下!

👉 下方点击「喜欢作者」,金额随意心意无价。 让我们在技术路上彼此赋能,少走弯路,高效落地!

付费后,请务必添加我的微信(微信号:weber812)并发送支付凭证,我将第一时间拉您进入专属「技术支持群」。

进群后您能获得哪些福利👉 完整代码具体请见下方说明!

unsetunset一、整体架构设计unsetunset

1.1 核心技术栈

技术组件 具体实现 作用
Embedding模型 HuggingFace text2vec-base-chinese 文本向量化,支持中文语义理解
向量检索 PostgreSQL pgvector扩展 高效的向量相似度计算
LLM框架 LangChain 大模型调用与Prompt工程
后端框架 FastAPI + SQLModel 高性能异步API服务
数据库 PostgreSQL 存储元数据与向量索引
异步处理 ThreadPoolExecutor 向量化任务异步执行

1.1 数据问答核心流程

1.2 语义检索逻辑

1.2 RAG三层检索体系

SQLBot构建了三层语义检索体系,分别针对不同粒度的知识:

┌─────────────────────────────────────────┐│         用户问题 (User Question)          │└──────────────┬──────────────────────────┘               │               ▼┌──────────────────────────────────────────┐│  第一层:数据源级别检索 (DS Embedding)      ││  - 从多个数据源中筛选最相关的数据源         ││  - 基于数据源名称+描述+表结构的向量         │└──────────────┬───────────────────────────┘               │               ▼┌──────────────────────────────────────────┐│  第二层:表级别检索 (Table Embedding)       ││  - 从数据源的众多表中筛选最相关的表         ││  - 基于表名+注释+字段结构的向量            │└──────────────┬───────────────────────────┘               │               ▼┌──────────────────────────────────────────┐│  第三层:业务知识增强                       ││  ├─ 术语库检索 (Terminology)              ││  │  - 业务术语与标准字段的映射              ││  └─ SQL示例检索 (Data Training)           ││     - 相似问题的SQL示例参考                │└──────────────────────────────────────────┘

unsetunset二、核心实现详解unsetunset

2.1 Embedding模型封装

SQLBot采用单例模式管理Embedding模型,避免重复加载,提升性能:

# apps/ai_model/embedding.pyclass EmbeddingModelCache:    @staticmethod    def get_model(key: str = settings.DEFAULT_EMBEDDING_MODEL,                  config: EmbeddingModelInfo = local_embedding_model) -> Embeddings:        model_instance = _embedding_model.get(key)        if model_instance isNone:            lock = EmbeddingModelCache._get_lock(key)            with lock:                model_instance = _embedding_model.get(key)                if model_instance isNone:                    model_instance = EmbeddingModelCache._new_instance(config)                    _embedding_model[key] = model_instance        return model_instance        @staticmethod    def _new_instance(config: EmbeddingModelInfo = local_embedding_model):        return HuggingFaceEmbeddings(            model_name=config.name,             cache_folder=config.folder,            model_kwargs={'device': config.device},            encode_kwargs={'normalize_embeddings': True}        )

设计亮点:

  • 双重检查锁定(Double-Check Locking)保证线程安全
  • 向量归一化(normalize_embeddings=True)优化余弦相似度计算
  • 支持CPU/GPU设备切换,适配不同部署环境

2.2 第一层:数据源级别检索

当系统配置了多个数据源时,首先需要确定用户问题属于哪个数据源的范畴。

2.2.1 数据源向量生成
# apps/datasource/crud/table.pydef save_ds_embedding(session_maker, ids: List[int]):    model = EmbeddingModelCache.get_model()    session = session_maker()        for _id in ids:        schema_table = ''        ds = session.query(CoreDatasource).filter(CoreDatasource.id == _id).first()                # 组合数据源信息        schema_table += f"{ds.name}, {ds.description}\n"                # 添加所有表的结构信息        tables = session.query(CoreTable).filter(CoreTable.ds_id == ds.id).all()        for table in tables:            fields = session.query(CoreField).filter(CoreField.table_id == table.id).all()            schema_table += f"# Table: {table.table_name}"                        table_comment = table.custom_comment.strip() if table.custom_comment else''            if table_comment:                schema_table += f", {table_comment}\n[\n"            else:                schema_table += '\n[\n'                        # 添加字段信息            field_list = []            for field in fields:                field_comment = field.custom_comment.strip() if field.custom_comment else''                if field_comment:                    field_list.append(f"({field.field_name}:{field.field_type}, {field_comment})")                else:                    field_list.append(f"({field.field_name}:{field.field_type})")                        schema_table += ",\n".join(field_list)            schema_table += '\n]\n'                # 生成向量并存储        emb = json.dumps(model.embed_query(schema_table))        stmt = update(CoreDatasource).where(CoreDatasource.id == _id).values(embedding=emb)        session.execute(stmt)        session.commit()
2.2.2 数据源检索逻辑
# apps/datasource/embedding/ds_embedding.pydef get_ds_embedding(session: SessionDep, current_user: CurrentUser, _ds_list,                      out_ds: AssistantOutDs, question: str) -> list:    _list = []        # 为每个数据源构建检索文本    for _ds in out_ds.ds_list:        ds = out_ds.get_ds(_ds.id)        table_schema = out_ds.get_db_schema(_ds.id, question, embedding=False)        ds_info = f"{ds.name}, {ds.description}\n"        ds_schema = ds_info + table_schema        _list.append({            "id": ds.id,             "ds_schema": ds_schema,             "cosine_similarity": 0.0,             "ds": ds        })        if _list:        text = [s.get('ds_schema') for s in _list]                # 批量向量化        model = EmbeddingModelCache.get_model()        results = model.embed_documents(text)                # 计算问题向量        q_embedding = model.embed_query(question)                # 计算余弦相似度        for index in range(len(results)):            item = results[index]            _list[index]['cosine_similarity'] = cosine_similarity(q_embedding, item)                # 排序并取TopK        _list.sort(key=lambda x: x['cosine_similarity'], reverse=True)        _list = _list[:settings.DS_EMBEDDING_COUNT]  # 默认取Top10                return [{"id": obj.get('ds').id,                  "name": obj.get('ds').name,                  "description": obj.get('ds').description}                for obj in _list]        return _list

关键点:

  • 数据源向量包含:名称+描述+完整表结构,信息量充足
  • 支持实时计算和预存储两种模式,灵活应对不同场景
  • TopK可配置,默认10个,平衡精度与性能

2.3 第二层:表级别检索

确定数据源后,需要从数百张表中筛选出与问题最相关的表。

2.3.1 表向量生成
# apps/datasource/crud/table.pydef save_table_embedding(session_maker, ids: List[int]):    model = EmbeddingModelCache.get_model()    session = session_maker()        for _id in ids:        table = session.query(CoreTable).filter(CoreTable.id == _id).first()        fields = session.query(CoreField).filter(CoreField.table_id == table.id).all()                # 构建表的Schema描述        schema_table = f"# Table: {table.table_name}"                table_comment = table.custom_comment.strip() if table.custom_comment else''        if table_comment:            schema_table += f", {table_comment}\n[\n"        else:            schema_table += '\n[\n'                # 添加字段信息        if fields:            field_list = []            for field in fields:                field_comment = field.custom_comment.strip() if field.custom_comment else''                if field_comment:                    field_list.append(f"({field.field_name}:{field.field_type}, {field_comment})")                else:                    field_list.append(f"({field.field_name}:{field.field_type})")            schema_table += ",\n".join(field_list)                schema_table += '\n]\n'                # 生成向量并持久化        emb = json.dumps(model.embed_query(schema_table))        stmt = update(CoreTable).where(CoreTable.id == _id).values(embedding=emb)        session.execute(stmt)        session.commit()
2.3.2 表检索实现
# apps/datasource/embedding/table_embedding.pydef calc_table_embedding(tables: list[dict], question: str):    _list = []    for table in tables:        _list.append({            "id": table.get('id'),             "schema_table": table.get('schema_table'),             "embedding": table.get('embedding'),            "cosine_similarity": 0.0        })        if _list:        model = EmbeddingModelCache.get_model()                # 使用预存储的向量        results = [item.get('embedding') for item in _list]                # 计算问题向量        q_embedding = model.embed_query(question)                # 计算相似度        for index in range(len(results)):            item = results[index]            if item:                _list[index]['cosine_similarity'] = cosine_similarity(                    q_embedding,                     json.loads(item)                )                # 排序取TopK        _list.sort(key=lambda x: x['cosine_similarity'], reverse=True)        _list = _list[:settings.TABLE_EMBEDDING_COUNT]  # 默认Top10                return _list        return _list

优化策略:

  • 表向量预计算并持久化到数据库,避免实时计算开销
  • 仅对问题进行实时向量化,大幅提升响应速度
  • 支持增量更新,表结构变更时异步更新向量
2.3.3 余弦相似度计算
# apps/datasource/embedding/utils.pydef cosine_similarity(vec_a, vec_b):    if len(vec_a) != len(vec_b):        raise ValueError("The vector dimension must be the same")        # 点积    dot_product = sum(a * b for a, b in zip(vec_a, vec_b))        # 向量模    norm_a = math.sqrt(sum(a * a for a in vec_a))    norm_b = math.sqrt(sum(b * b for b in vec_b))        if norm_a == 0or norm_b == 0:        return0.0        return dot_product / (norm_a * norm_b)

2.4 第三层:业务知识增强

2.4.1 术语库检索

业务系统中存在大量专业术语,如"GMV"、“UV”、"ROI"等,需要映射到具体的数据库字段。

术语向量生成:

# apps/terminology/curd/terminology.pydef save_embeddings(session_maker, ids: List[int]):    ifnot settings.EMBEDDING_ENABLED:        return        session = session_maker()    # 查询术语及其同义词    _list = session.query(Terminology).filter(        or_(Terminology.id.in_(ids), Terminology.pid.in_(ids))    ).all()        _words_list = [item.word for item in _list]        # 批量向量化    model = EmbeddingModelCache.get_model()    results = model.embed_documents(_words_list)        # 存储向量    for index in range(len(results)):        item = results[index]        stmt = update(Terminology).where(Terminology.id == _list[index].id).values(            embedding=item        )        session.execute(stmt)        session.commit()

术语检索(PostgreSQL向量检索):

# apps/terminology/curd/terminology.pyembedding_sql = f"""SELECT id, pid, word, similarityFROM(SELECT id, pid, word, oid, specific_ds, datasource_ids, enabled,( 1 - (embedding <=> :embedding_array) ) AS similarityFROM terminology AS child) TEMPWHERE similarity > {settings.EMBEDDING_TERMINOLOGY_SIMILARITY}   AND oid = :oid   AND enabled = true  AND (specific_ds = false OR specific_ds IS NULL)ORDER BY similarity DESCLIMIT {settings.EMBEDDING_TERMINOLOGY_TOP_COUNT}"""def select_terminology_by_word(session: SessionDep, word: str, oid: int,                                datasource: int = None):    _list: List[Terminology] = []        # 1. 先进行模糊匹配    stmt = select(Terminology.id, Terminology.pid, Terminology.word).where(        and_(            text(":sentence ILIKE '%' || word || '%'"),             Terminology.oid == oid,             Terminology.enabled == True        )    )    results = session.execute(stmt, {'sentence': word}).fetchall()        for row in results:        _list.append(Terminology(id=row.id, word=row.word, pid=row.pid))        # 2. 再进行向量检索    if settings.EMBEDDING_ENABLED:        model = EmbeddingModelCache.get_model()        embedding = model.embed_query(word)                results = session.execute(            text(embedding_sql),            {'embedding_array': str(embedding), 'oid': oid}        ).fetchall()                for row in results:            _list.append(Terminology(id=row.id, word=row.word, pid=row.pid))        # 3. 去重并查询完整信息    _ids = list(set([row.id if row.pid isNoneelse row.pid for row in _list]))        t_list = session.query(        Terminology.id, Terminology.pid, Terminology.word, Terminology.description    ).filter(or_(Terminology.id.in_(_ids), Terminology.pid.in_(_ids))).all()        # 4. 组织返回结果    _map = {}    for row in t_list:        pid = str(row.pid) if row.pid isnotNoneelse str(row.id)        if _map.get(pid) isNone:            _map[pid] = {'words': [], 'description': ''}        if row.pid isNone:            _map[pid]['description'] = row.description        _map[pid]['words'].append(row.word)        return list(_map.values())

关键特性:

  • 使用PostgreSQL的<=>操作符进行向量距离计算
  • 相似度阈值可配置(EMBEDDING_TERMINOLOGY_SIMILARITY,默认0.4)
  • 支持数据源级别的术语隔离(specific_ds字段)
  • 模糊匹配+向量检索双重保障,提高召回率
2.4.2 SQL示例检索(Data Training)

针对复杂查询,提供历史SQL示例作为参考:

# apps/data_training/curd/data_training.pyembedding_sql = f"""SELECT id, datasource, question, similarityFROM(SELECT id, datasource, question, oid, enabled,( 1 - (embedding <=> :embedding_array) ) AS similarityFROM data_training AS child) TEMPWHERE similarity > {settings.EMBEDDING_DATA_TRAINING_SIMILARITY}   and oid = :oid   and datasource = :datasource   and enabled = trueORDER BY similarity DESCLIMIT {settings.EMBEDDING_DATA_TRAINING_TOP_COUNT}"""def select_training_by_question(session: SessionDep, question: str, oid: int,                                 datasource: Optional[int] = None):    _list: List[DataTraining] = []        # 1. 模糊匹配    stmt = select(DataTraining.id, DataTraining.question).where(        and_(            or_(                text(":sentence ILIKE '%' || question || '%'"),                 text("question ILIKE '%' || :sentence || '%'")            ),            DataTraining.oid == oid,            DataTraining.enabled == True,            DataTraining.datasource == datasource        )    )    results = session.execute(stmt, {'sentence': question}).fetchall()        for row in results:        _list.append(DataTraining(id=row.id, question=row.question))        # 2. 向量检索    if settings.EMBEDDING_ENABLED:        model = EmbeddingModelCache.get_model()        embedding = model.embed_query(question)                results = session.execute(            text(embedding_sql),            {'embedding_array': str(embedding), 'oid': oid, 'datasource': datasource}        )                for row in results:            _list.append(DataTraining(id=row.id, question=row.question))        # 3. 去重并查询完整信息    _ids = list(set([row.id for row in _list]))        t_list = session.query(        DataTraining.id, DataTraining.question, DataTraining.description    ).filter(DataTraining.id.in_(_ids)).all()        _map = {}    for row in t_list:        _map[row.id] = {            'question': row.question,             'suggestion-answer': row.description        }        return list(_map.values())

SQL示例格式化为XML:

def to_xml_string(_dict: list[dict] | dict, root: str = 'sql-examples') -> str:    item_name_func = lambda x: 'sql-example'if x == 'sql-examples'else'item'    xml = dicttoxml.dicttoxml(        _dict,        cdata=['question', 'suggestion-answer'],        custom_root=root,        item_func=item_name_func,        xml_declaration=False,        encoding='utf-8',        attr_type=False    ).decode('utf-8')        pretty_xml = parseString(xml).toprettyxml()        # 清理XML声明    if pretty_xml.startswith('<?xml'):        end_index = pretty_xml.find('>') + 1        pretty_xml = pretty_xml[end_index:].lstrip()        # 替换XML转义字符    escape_map = {        '<': '<',        '>': '>',        '&': '&',        '"': '"',        ''': "'"    }    for escaped, original in escape_map.items():        pretty_xml = pretty_xml.replace(escaped, original)        return pretty_xml

示例输出:

<sql-examples>  <sql-example>    <question><![CDATA[查询最近7天的销售额趋势]]></question>    <suggestion-answer><![CDATA[      SELECT DATE(order_time) as date, SUM(amount) as total_sales      FROM orders      WHERE order_time >= CURRENT_DATE - INTERVAL '7 days'      GROUP BY DATE(order_time)      ORDER BY date    ]]></suggestion-answer>  </sql-example></sql-examples>

2.5 异步向量化处理

为避免阻塞主线程,向量生成采用异步处理:

# common/utils/embedding_threads.pyfrom concurrent.futures import ThreadPoolExecutorexecutor = ThreadPoolExecutor(max_workers=200)def run_save_table_embeddings(ids: List[int]):    from apps.datasource.crud.table import save_table_embedding    executor.submit(save_table_embedding, session_maker, ids)def run_save_ds_embeddings(ids: List[int]):    from apps.datasource.crud.table import save_ds_embedding    executor.submit(save_ds_embedding, session_maker, ids)def run_save_terminology_embeddings(ids: List[int]):    from apps.terminology.curd.terminology import save_embeddings    executor.submit(save_embeddings, session_maker, ids)def run_save_data_training_embeddings(ids: List[int]):    from apps.data_training.curd.data_training import save_embeddings    executor.submit(save_embeddings, session_maker, ids)

触发时机:

  • 数据源创建/更新时
  • 表结构修改时
  • 术语库/SQL示例新增/编辑时
  • 系统启动时填充空向量(fill_empty_embeddings)

unsetunset三、完整工作流程unsetunset

3.1 端到端流程图

用户提问: "查询最近一周销售额最高的前10个商品"    │    ▼┌─────────────────────────────────────────┐│ 1. 数据源检索 (如果配置了多数据源)        ││    - 向量化问题                          ││    - 计算与各数据源的相似度               ││    - 选择Top1数据源: "电商订单库"         │└──────────────┬──────────────────────────┘               │               ▼┌─────────────────────────────────────────┐│ 2. 表级别检索                            ││    - 获取数据源下所有表的预存向量         ││    - 计算相似度排序                      ││    - 筛选Top10表:                        ││      • orders (0.85)                    ││      • order_items (0.82)               ││      • products (0.78)                  ││      • ...                              │└──────────────┬──────────────────────────┘               │               ▼┌─────────────────────────────────────────┐│ 3. 术语库检索                            ││    - 提取问题中的关键词: "销售额"         ││    - 检索术语库:                         ││      销售额 -> SUM(order_items.price * quantity) ││    - 补充到Prompt                        │└──────────────┬──────────────────────────┘               │               ▼┌─────────────────────────────────────────┐│ 4. SQL示例检索                           ││    - 检索相似问题的SQL:                   ││      "统计每日销售额Top10商品"            ││    - 格式化为XML示例                     ││    - 补充到Prompt                        │└──────────────┬──────────────────────────┘               │               ▼┌─────────────────────────────────────────┐│ 5. 构建最终Prompt                        ││    - Schema: 筛选后的表结构               ││    - Terminology: 术语映射关系            ││    - Examples: SQL示例                   ││    - Question: 用户问题                  │└──────────────┬──────────────────────────┘               │               ▼┌─────────────────────────────────────────┐│ 6. LLM生成SQL                            ││    SELECT p.product_name,                ││           SUM(oi.price * oi.quantity) as sales ││    FROM order_items oi                   ││    JOIN products p ON oi.product_id = p.id ││    JOIN orders o ON oi.order_id = o.id   ││    WHERE o.order_time >= CURRENT_DATE - 7 ││    GROUP BY p.product_name               ││    ORDER BY sales DESC                   ││    LIMIT 10                              │└─────────────────────────────────────────┘

3.2 配置参数说明

配置项 默认值 说明
TABLE_EMBEDDING_ENABLED True 是否启用表级别向量检索
TABLE_EMBEDDING_COUNT 10 表检索TopK数量
DS_EMBEDDING_COUNT 10 数据源检索TopK数量
EMBEDDING_ENABLED True 是否启用术语/示例向量检索
EMBEDDING_TERMINOLOGY_SIMILARITY 0.4 术语检索相似度阈值
EMBEDDING_TERMINOLOGY_TOP_COUNT 5 术语检索TopK数量
EMBEDDING_DATA_TRAINING_SIMILARITY 0.4 SQL示例检索相似度阈值
EMBEDDING_DATA_TRAINING_TOP_COUNT 5 SQL示例检索TopK数量
DEFAULT_EMBEDDING_MODEL shibing624/text2vec-base-chinese Embedding模型名称

unsetunset四、性能优化策略unsetunset

4.1 向量预计算与缓存

问题: 实时向量化开销大,影响响应速度

解决方案:

  • 表结构向量、数据源向量、术语向量、SQL示例向量全部预计算
  • 存储在PostgreSQL的embedding字段(VECTOR类型)
  • 仅对用户问题进行实时向量化

效果对比:

场景 实时计算 预计算 提升
100张表检索 ~2000ms ~50ms 40倍
1000条术语检索 ~5000ms ~80ms 62倍

4.2 批量向量化

# 批量处理,减少模型调用次数text = [s.get('schema_table') for s in _list]results = model.embed_documents(text)  # 一次调用处理多个文本

4.3 异步更新机制

# 数据更新时异步触发向量更新,不阻塞主流程def update_table_and_fields(session: SessionDep, data: TableObj):    update_table(session, data.table)    for field in data.fields:        update_field(session, field)        # 异步更新向量    run_save_table_embeddings([data.table.id])    run_save_ds_embeddings([data.table.ds_id])

4.4 PostgreSQL向量索引优化

-- 使用pgvector扩展的HNSW索引加速检索CREATE INDEX terminology_embedding_idx ON terminology USING hnsw (embedding vector_cosine_ops);CREATE INDEX data_training_embedding_idx ON data_training USING hnsw (embedding vector_cosine_ops);CREATE INDEX core_table_embedding_idx ON core_table USING hnsw (embedding vector_cosine_ops);

unsetunset五、工程实践经验unsetunset

5.1 Schema描述的艺术

好的Schema描述:

# Table: order_items, 订单明细表[(id:BIGINT, 主键ID),(order_id:BIGINT, 订单ID),(product_id:BIGINT, 商品ID),(quantity:INT, 购买数量),(price:DECIMAL, 单价),(created_at:TIMESTAMP, 创建时间)]

差的Schema描述:

# Table: order_items[(id:BIGINT),(order_id:BIGINT),(product_id:BIGINT),(quantity:INT),(price:DECIMAL),(created_at:TIMESTAMP)]

关键点:

  • 表注释必须清晰说明业务含义
  • 字段注释要包含业务语义,不仅仅是字段名翻译
  • 注释质量直接影响向量检索准确率

5.2 术语库设计原则

1. 主词+同义词结构

主词: GMV同义词: [成交总额, 交易总额, 总成交金额]描述: SUM(order_amount) WHERE order_status = 'paid'

2. 支持数据源级别隔离

  • 不同业务线的同一术语可能含义不同
  • 通过specific_ds字段实现隔离

3. 启用/禁用控制

  • 过时术语可禁用而非删除,保留历史记录

5.3 SQL示例的组织

示例格式:

{    "question": "查询最近7天每日销售额",    "suggestion-answer": """        SELECT DATE(order_time) as date,                SUM(amount) as daily_sales        FROM orders        WHERE order_time >= CURRENT_DATE - INTERVAL '7 days'        GROUP BY DATE(order_time)        ORDER BY date    """}

最佳实践:

  • 问题描述要自然,贴近用户真实表达
  • SQL要规范,包含必要的注释
  • 覆盖常见查询模式:聚合、关联、时间范围、排序等
  • 定期review和更新,剔除低质量示例

5.4 向量检索的阈值调优

相似度阈值设置建议:

场景 推荐阈值 理由
术语检索 0.3-0.5 术语表达多样,阈值过高会漏召回
SQL示例检索 0.4-0.6 问题表达相对规范,可适当提高
表检索 无阈值,TopK 必须返回结果,用TopK控制数量

调优方法:

  1. 收集badcase,分析相似度分布
  2. 绘制precision-recall曲线
  3. 根据业务容忍度选择平衡点

unsetunset六、常见问题与解决方案unsetunset

6.1 向量检索召回率低

现象: 明明术语库有配置,但检索不到

排查步骤:

  1. 检查向量是否生成:SELECT id, word FROM terminology WHERE embedding IS NULL
  2. 检查相似度阈值是否过高
  3. 检查术语的enabled状态
  4. 检查specific_ds配置是否正确

解决方案:

  • 运行fill_empty_embeddings填充空向量
  • 调低相似度阈值
  • 增加同义词覆盖

6.2 表检索不准确

现象: 检索出的表与问题无关

原因分析:

  • 表注释缺失或不准确
  • 字段注释缺失
  • 表结构过于相似,难以区分

解决方案:

  • 完善表和字段的custom_comment
  • 在表注释中添加典型使用场景描述
  • 利用表关系信息辅助判断

6.3 向量更新延迟

现象: 修改了表结构,但检索结果未更新

原因: 异步更新任务积压或失败

排查:

# 查看线程池状态executor._threads  # 查看活跃线程数executor._work_queue.qsize()  # 查看待处理任务数

解决方案:

  • 增加线程池大小:ThreadPoolExecutor(max_workers=500)
  • 添加任务监控和告警
  • 失败任务重试机制

unsetunset七、优化方向unsetunset

7.1 混合检索

结合BM25等传统检索方法,提升召回率:

# 伪代码def hybrid_search(question, tables):    # 向量检索    vector_results = vector_search(question, tables, top_k=20)        # BM25检索    bm25_results = bm25_search(question, tables, top_k=20)        # 融合排序(RRF: Reciprocal Rank Fusion)    final_results = rrf_merge(vector_results, bm25_results, top_k=10)        return final_results

7.2 用户反馈学习

收集用户对SQL结果的反馈,优化检索策略:

# 记录用户反馈feedback = {    "question": "查询销售额",    "retrieved_tables": ["orders", "products"],    "user_feedback": "positive",  # or "negative"    "correct_tables": ["order_items", "products"]  # 用户纠正}# 基于反馈微调Embedding模型或调整检索权重

7.3 多模态检索

支持图表、ER图等多模态信息:

# 结合ER图信息table_context = {    "table_schema": schema_text,    "er_diagram": er_image_embedding,    "usage_frequency": 0.8,    "typical_queries": [...]}

unsetunset八、总结unsetunset

SQLBot的语义检索RAG方案,通过三层检索体系(数据源→表→业务知识),有效解决了Text2SQL场景下的核心痛点:

  1. 多数据源场景: 通过数据源级别向量检索,准确定位目标数据源
  2. 大规模表结构: 通过表级别向量检索,从数百张表中筛选相关表
  3. 业务术语理解: 通过术语库向量检索,建立业务语言与数据库字段的映射
  4. 复杂查询参考: 通过SQL示例向量检索,提供历史案例参考

核心优势:

  • 高性能: 向量预计算+异步更新,检索耗时<100ms
  • 高准确: 三层检索+双重保障(模糊+向量),召回率>90%
  • 易扩展: 模块化设计,支持灵活配置和定制
  • 工程化: 完善的异常处理、监控、配置管理

这套方案不仅适用于Text2SQL,也可推广到其他需要精准检索结构化知识的场景,如API文档检索、代码补全等。关键在于理解业务特点,设计合理的向量化策略和检索层次。


参考资源:

  • SQLBot开源地址: https://github.com/dataease/SQLBot
  • pgvector文档: https://github.com/pgvector/pgvector
  • text2vec模型: https://huggingface.co/shibing624/text2vec-base-chinese

unsetunset📚 完整代码unsetunset

  • 项目地址: https://github.com/apconw/sanic-web

🌈 项目亮点

  • ✅ 集成 MCP 多智能体架构
  • ✅ 支持 Dify / LangChain / LlamaIndex / Ollama / vLLM / Neo4j
  • ✅ 前端采用 Vue3 + TypeScript + Vite5,现代化交互体验
  • ✅ 内置 ECharts / AntV 图表问答 + CSV 表格问答
  • ✅ 支持对接主流 RAG 系统 与 Text2SQL 引擎
  • ✅ 轻量级 Sanic 后端,适合快速部署与二次开发
  • 项目已被蚂蚁官方推荐收录

AntV

运行效果:

数据问答

如何系统的学习大模型 AI ?

由于新岗位的生产效率,要优于被取代岗位的生产效率,所以实际上整个社会的生产效率是提升的。

但是具体到个人,只能说是:

“最先掌握AI的人,将会比较晚掌握AI的人有竞争优势”。

这句话,放在计算机、互联网、移动互联网的开局时期,都是一样的道理。

我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。

我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将并将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。

一直在更新,更多的大模型学习和面试资料已经上传带到CSDN的官方了,有需要的朋友可以扫描下方二维码免费领取【保证100%免费】👇👇

在这里插入图片描述

01.大模型风口已至:月薪30K+的AI岗正在批量诞生

在这里插入图片描述

2025年大模型应用呈现爆发式增长,根据工信部最新数据:

国内大模型相关岗位缺口达47万

初级工程师平均薪资28K(数据来源:BOSS直聘报告)

70%企业存在"能用模型不会调优"的痛点

真实案例:某二本机械专业学员,通过4个月系统学习,成功拿到某AI医疗公司大模型优化岗offer,薪资直接翻3倍!

02.大模型 AI 学习和面试资料

1️⃣ 提示词工程:把ChatGPT从玩具变成生产工具
2️⃣ RAG系统:让大模型精准输出行业知识
3️⃣ 智能体开发:用AutoGPT打造24小时数字员工

📦熬了三个大夜整理的《AI进化工具包》送你:
✔️ 大厂内部LLM落地手册(含58个真实案例)
✔️ 提示词设计模板库(覆盖12大应用场景)
✔️ 私藏学习路径图(0基础到项目实战仅需90天)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

第一阶段(10天):初阶应用

该阶段让大家对大模型 AI有一个最前沿的认识,对大模型 AI 的理解超过 95% 的人,可以在相关讨论时发表高级、不跟风、又接地气的见解,别人只会和 AI 聊天,而你能调教 AI,并能用代码将大模型和业务衔接。

  • 大模型 AI 能干什么?
  • 大模型是怎样获得「智能」的?
  • 用好 AI 的核心心法
  • 大模型应用业务架构
  • 大模型应用技术架构
  • 代码示例:向 GPT-3.5 灌入新知识
  • 提示工程的意义和核心思想
  • Prompt 典型构成
  • 指令调优方法论
  • 思维链和思维树
  • Prompt 攻击和防范

第二阶段(30天):高阶应用

该阶段我们正式进入大模型 AI 进阶实战学习,学会构造私有知识库,扩展 AI 的能力。快速开发一个完整的基于 agent 对话机器人。掌握功能最强的大模型开发框架,抓住最新的技术进展,适合 Python 和 JavaScript 程序员。

  • 为什么要做 RAG
  • 搭建一个简单的 ChatPDF
  • 检索的基础概念
  • 什么是向量表示(Embeddings)
  • 向量数据库与向量检索
  • 基于向量检索的 RAG
  • 搭建 RAG 系统的扩展知识
  • 混合检索与 RAG-Fusion 简介
  • 向量模型本地部署

第三阶段(30天):模型训练

恭喜你,如果学到这里,你基本可以找到一份大模型 AI相关的工作,自己也能训练 GPT 了!通过微调,训练自己的垂直大模型,能独立训练开源多模态大模型,掌握更多技术方案。

到此为止,大概2个月的时间。你已经成为了一名“AI小子”。那么你还想往下探索吗?

  • 为什么要做 RAG
  • 什么是模型
  • 什么是模型训练
  • 求解器 & 损失函数简介
  • 小实验2:手写一个简单的神经网络并训练它
  • 什么是训练/预训练/微调/轻量化微调
  • Transformer结构简介
  • 轻量化微调
  • 实验数据集的构建

第四阶段(20天):商业闭环

对全球大模型从性能、吞吐量、成本等方面有一定的认知,可以在云端和本地等多种环境下部署大模型,找到适合自己的项目/创业方向,做一名被 AI 武装的产品经理。

  • 硬件选型
  • 带你了解全球大模型
  • 使用国产大模型服务
  • 搭建 OpenAI 代理
  • 热身:基于阿里云 PAI 部署 Stable Diffusion
  • 在本地计算机运行大模型
  • 大模型的私有化部署
  • 基于 vLLM 部署大模型
  • 案例:如何优雅地在阿里云私有部署开源大模型
  • 部署一套开源 LLM 项目
  • 内容安全
  • 互联网信息服务算法备案

学习是一个过程,只要学习就会有挑战。天道酬勤,你越努力,就会成为越优秀的自己。

如果你能在15天内完成所有的任务,那你堪称天才。然而,如果你能完成 60-70% 的内容,你就已经开始具备成为一名大模型 AI 的正确特征了。

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

在这里插入图片描述

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐