为什么需要提取Q&A?主要有两个用途,一个用途是为了去训练模型,让模型朝着我希望的方向发展;另一个是可以提高RAG的准确率,提高相关性,准确性,使得用户的问题能够更好的匹配查询,避免大模型胡乱生成答案。


前言

提供一个面向电力技术文本的“问答数据集”生成工具,利用 OpenAI 大模型(可接私有 / 代理 API)自动把零散的专业文本切片批量转化为高质量、结构化的 问题–答案 (Q&A)。

典型应用场景:

  • 快速构建电力行业知识库 / 向量库。
  • 生成 LLM 对电力场景的指令微调数据。
  • 自助创建运维、电网培训题库。

一、整体架构

graph TD
    A[命令行入口 main()] --> B[process_excel()]
    B --> C[读取 Excel<br/>content 列]
    C --> D[批量拼接文本 batch_text]
    D --> E[PowerQA_Generator.generate_qa_pairs()]
    E -->|JSON Q&A| F[验证 + 过滤]
    F --> G{中间保存?}
    G -->|是| H[save_intermediate()]
    G -->|否| I[继续循环]
    I --> J[全部结束后<br/>save_final_output()]
    J --> K[generate_analysis_report()]

二、依赖与运行环境

  • Python ≥3.8
  • pandas、tqdm、openai、argparse、logging、re 等标准库/三方库

(requirements.txt 推荐:pandas>=2.0.0, openai>=1.10.0, tqdm>=4.60)

三、核心类 PowerQA_Generator

方法 作用 关键实现
__init__() 建立 OpenAI client 与 Prompt 模板 「多文本片段」提示词,嵌入电力子领域要求
generate_qa_pairs() 与 LLM 交互,拿到 JSON 格式的 Q&A 检查嵌套 ```json```、多次重试、日志
is_valid_power_qa() 行业词汇 + 最小长度过滤 电力关键词列表近百条,防止跑偏
test_connection() 预检 API 连通性 可避免长时间空跑

四、Excel 批处理流程

1. 读取– process_excel()
读取指定 Excel 文件,要求存在 content 列(已切好的文本块 / 子句)。
2. 批次划分
batch_size 决定一次聚合几个文本片段,避免 prompt 超长。
3. 合并片段
同批用 --------------- 分隔,作为单轮 prompt 输入 LLM。
4. 生成与过滤
调用 generate_qa_pairs() 拿到一段 JSON;再与关键词白名单比对过滤噪声。
5. 中间结果落盘
每 2 个批次自动调用 save_intermediate() 生成临时 XLSX,容灾友好。
6. 最终输出
save_final_output() 写成汇总表,并追加行业后处理字段(见下一节)。

五、电力领域专属后处理 

函数 power_domain_postprocessing() 会对每条 Q&A 做增值加工:

  • 问题 技术分类(设备原理 / 操作规程 / 故障处理 / …)。
  • 正则提取 电压等级(如 10 kV、500 kV)。
  • 标记答案中是否含有 技术参数(kV、Hz、%…)。
  • 衍生字段 batch_info 方便回溯来源。

再由 generate_analysis_report() 输出文字版统计报告,包含:

  • 批次、分类分布
  • 电压等级 Top N
  • 技术参数占比
  • 随机抽样 5 条高质量示例

六、命令行用法

# 命令行参数解析
    parser = argparse.ArgumentParser(description='电力领域问答对生成工具')
    parser.add_argument('--input', type=str, default='chunks_final.xlsx',
                        help='输入Excel文件路径 (默认: power_books.xlsx)')
    parser.add_argument('--output', type=str, default='power_qa_dataset.xlsx',
                        help='输出Excel文件路径 (默认: power_qa_dataset.xlsx)')
    parser.add_argument('--batch_size', type=int, default=10,
                        help='每批处理的文本块数量 (默认: 10)')
    parser.add_argument('--max_batches', type=int, default=None,
                        help='最大处理批次数 (默认: 全部批次)')
    parser.add_argument('--api_key', type=str, default='sk-WVoD66MR',
                        help='OpenAI API密钥')
    parser.add_argument('--api_base', type=str, default='',
                        help='OpenAI API基础URL')

常用参数说明:

参数 默认 作用
--input kv_store_text_chunks_final.xlsx 含文本片段的源文件
--output power_qa_dataset.xlsx 目标问答 Excel
--batch_size 10 每批拼接多少文本片段
--max_batches None 只跑前 N 批,调试友好
--api_key / --api_base 示例值 私有化 / 国内代理可替换

七、日志与容错 

  • 全程写入 power_qa_generator.log,包含原始 JSON 响应、防止定位难。
  • JSON 解析失败、API 429/500 等自动重试;依旧失败则跳过当前批,流程不中断。
  • time.sleep(1.5) 控速;可视情况调整。

八、可扩展点与优化建议

  • 并发调度:当前串行调用,配合 asyncio 或多线程 + token 速率窗可提速数倍。
  • 关键字池:is_valid_power_qa() 可外置到 YAML/JSON,便于持续更新。
  • Prompt 参数化:把子领域列表、问题类型枚举拆为配置文件,支持跨行业复用。
  • 向量去重:生成后可计算语义相似度,剔除冗余问答,提升数据质量。
  • Web UI:封装为 Gradio / Streamlit,使非技术人员也能上传 Excel 一键生成。

 九、完整代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
import json
import time
import re
import os
import logging
import argparse
from openai import OpenAI
from tqdm import tqdm
from datetime import datetime

# 配置日志
logging.basicConfig(
    filename='power_qa_generator.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)


class PowerQA_Generator:
    def __init__(self, api_key, api_base, model_name="gpt-4o"):
        self.client = OpenAI(api_key=api_key, base_url=api_base)
        self.model_name = model_name
        # 电力领域优化的提示词模板(支持多文本块)
        self.prompt_template = """
您是一位电力系统专家,负责从电力技术文本中创建高质量的问答数据集。请基于下面提供的多个相关文本片段,严格按照内容生成专业、准确的问答对。

## 专业要求:
1. 生成1-10个不同角度的专业问题,特别关注以下电力领域:
   - 电力设备技术:变压器、断路器、继电保护等设备的工作原理、技术参数和维护要求
   - 电网运行:频率控制、电压调节、稳定性分析、负荷预测等
   - 电力安全:操作规程、安全距离、防护措施、事故预防
   - 故障处理:短路计算、保护动作、事故分析流程
   - 新能源技术:光伏/风电并网技术,储能系统应用
   - 电力经济:电价机制、市场交易、成本分析
   - 标准规范:国家标准(GB)、行业规范(DL)、技术导则

2. 问题类型应覆盖多个文本片段的内容:
   [定义类] 如:什么是差动保护?其工作原理是什么?
   [参数类] 如:220kV变压器的空载损耗标准值是多少?
   [流程类] 如:倒闸操作的基本步骤和安全注意事项有哪些?
   [故障类] 如:发生单相接地故障时有哪些现象?如何处理?
   [计算类] 如:如何计算短路电流?需要考虑哪些因素?
   [安全类] 如:10kV线路的安全距离是多少?在哪些情况下需要增加距离?
   [标准类] 如:GB 50150-2016对变压器交接试验有哪些要求?

3. 答案要求:
   - 严格基于文本内容,准确简洁(不超过500字)
   - 包含具体技术参数(如电压等级、时间要求、数值范围等)
   - 使用专业术语(如:绝缘子、谐波、功率因数、继电保护等)
   - 整合多个文本片段的信息形成完整答案
   - 避免生成文本中未提及的内容

4. 使用JSON格式输出:{"questions": [{"question": "...", "answer": "..."}, ...]}

## 电力专业文本片段(由多个相关段落组成):
{text_segment}
"""

    def generate_qa_pairs(self, text_segment, max_retries=3):
        """生成电力专业问答对(支持多文本块)"""
        # prompt = self.prompt_template.format(text_segment=text_segment)
        # 使用 replace 而不是 format,避免模板中 JSON 示例里的花括号被误解析
        prompt = self.prompt_template.replace("{text_segment}", text_segment)

        for attempt in range(max_retries):
            try:
                response = self.client.chat.completions.create(
                    model=self.model_name,
                    messages=[
                        {"role": "system", "content": "您是电力系统高级工程师,专注于从多个技术文本中提取和整合知识,只能输出 JSON。"},
                        {"role": "user", "content": prompt + "\n\n请严格仅输出 JSON 字符串,不要添加解释文字。"}
                    ],
                    temperature=0.2,
                    response_format={"type": "json_object"}
                )

                # 解析JSON响应
                json_str = response.choices[0].message.content.strip()
                logging.info(f"RAW_RESPONSE: {json_str}")

                # 处理可能的 ```json ... ``` 代码块
                if json_str.startswith("```"):
                    # 去掉开头与结尾的```及语言标记
                    json_str = re.sub(r"^```[a-zA-Z]*\s*|```$", "", json_str, flags=re.S).strip()

                # 若仍不是以 { 开头,尝试提取首个 JSON 对象
                if not json_str.startswith("{"):
                    match = re.search(r"\{.*\}", json_str, re.S)
                    if match:
                        json_str = match.group(0)

                qa_data = json.loads(json_str)

                # 验证数据结构
                if "questions" not in qa_data or not isinstance(qa_data["questions"], list):
                    raise ValueError("响应格式错误: 缺少'questions'列表")

                # 清理答案并添加电力领域特殊处理
                validated_pairs = []
                for qa in qa_data["questions"]:
                    # 验证必要字段
                    if "question" not in qa or "answer" not in qa:
                        continue

                    # 清理文本
                    qa["question"] = re.sub(r'\s+', ' ', qa["question"]).strip()
                    qa["answer"] = re.sub(r'\s+', ' ', qa["answer"]).strip()

                    # 电力领域质量检查
                    if self.is_valid_power_qa(qa["question"], qa["answer"]):
                        validated_pairs.append({
                            "question": qa["question"],
                            "answer": qa["answer"]
                        })

                return validated_pairs

            except (json.JSONDecodeError, KeyError, ValueError) as e:
                logging.warning(f"JSON解析错误: {e}, 重试 {attempt + 1}/{max_retries}")
                time.sleep(2)
            except Exception as e:
                logging.error(f"API错误: {e}, 重试 {attempt + 1}/{max_retries}")
                time.sleep(5)

        logging.error(f"生成失败: {text_segment[:50]}...")
        return []

    def is_valid_power_qa(self, question, answer):
        """电力领域问答对质量验证"""
        # 检查是否包含电力关键词
        # power_keywords = [
        #     "电压", "电流", "变压器", "断路器", "线路", "保护", "接地", "绝缘",
        #     "调度", "负荷", "短路", "谐波", "功率", "kV", "继电", "配电",
        #     "变电", "输电", "并网", "频率", "有功", "无功", "电抗", "电纳"
        # ]
        power_keywords = [
            # 基本专业关键词
            "电压", "电流", "变压器", "断路器", "线路", "保护", "接地", "绝缘",
            "调度", "负荷", "短路", "谐波", "功率", "kV", "继电", "配电",
            "变电", "输电", "并网", "频率", "有功", "无功", "电纳",
            # 系统与设备结构
            "发电机", "变电站", "母线", "馈线", "电缆", "架空线", "三相电", "电流互感器",
            "电压互感器", "隔离开关",
            # 保护与自动化
            "继电保护", "过流保护", "距离保护", "差动保护", "保护继电器",
            "SCADA", "ADMS", "微电网", "需求响应",
            # 电能质量与参数
            "有功功率", "无功功率", "视在功率", "功率因数", "频率偏差",
            "谐波畸变", "电涌", "电压暂降", "瞬变", "电涌",
            # 电力电子与转换
            "整流器", "逆变器", "变频器", "静止无功补偿", "有源滤波器",
            "UPS", "储能系统", "BESS",
            # 运行与稳定性
            "自动发电控制", "自动电压控制", "低电压穿越", "高电压穿越",
            "减载", "重合闸", "故障定位", "潮流计算",
            # 测量与计量
            "电阻", "电抗", "阻抗", "电容", "电感", "监控", "计量",
            # 可再生与储能
            "光伏", "风电", "水电", "抽水蓄能", "超级电容", "飞轮储能",
            # 电机与驱动
            "同步电机", "异步电机", "电动机", "变频驱动", "励磁系统",
            # 标准与规范
            "IEC", "IEEE", "NEMA", "NFPA", "GB/T", "标准",
            # 控制理论与电气工程基础
            "PID控制", "PLC", "可编程逻辑控制", "电磁场", "电磁感应",
        ]

        has_keyword = any(kw in question or kw in answer for kw in power_keywords)

        # 检查答案是否具体
        # has_specifics = any(char in answer for char in ["kV", "kA", "MW", "MVA", "Hz", "%", "秒", "分钟", "℃"])

        # 检查答案格式是否规范
        # has_structured_answer = any(marker in answer for marker in ["1.", "①", "第一", "首先"])

        # return has_keyword and has_specifics and has_structured_answer and len(answer) >= 20
        return has_keyword and len(answer) >= 10

    def test_connection(self):
        """测试API连接"""
        try:
            response = self.client.chat.completions.create(
                model=self.model_name,
                messages=[{"role": "user", "content": "电力系统中的主要设备有哪些?"}],
                temperature=0.0
            )
            return "变压器" in response.choices[0].message.content
        except Exception as e:
            logging.error(f"连接测试失败: {e}")
            return False


def process_excel(input_path, output_excel, api_key, api_base, batch_size=10, max_batches=None):
    """
    处理Excel文件生成问答对并保存到Excel
    Args:
        input_path: 输入Excel路径
        output_excel: 输出Excel路径
        api_key: API密钥
        api_base: API基础URL
        batch_size: 每次处理的文本块数量
        max_batches: 最大处理批次数
    """
    # 读取Excel文件
    try:
        df = pd.read_excel(input_path)
        logging.info(f"成功读取文件: {input_path}, 共 {len(df)} 行")
    except Exception as e:
        logging.error(f"读取Excel失败: {e}")
        raise

    if 'content' not in df.columns:
        error_msg = "Excel文件中缺少'content'列"
        logging.error(error_msg)
        raise ValueError(error_msg)

    # 获取文本块(已切分好的)
    text_blocks = df['content'].dropna().tolist()
    total_blocks = len(text_blocks)

    # 计算总批次数
    total_batches = (total_blocks + batch_size - 1) // batch_size

    # 应用批次数限制
    if max_batches is not None and max_batches > 0:
        total_batches = min(total_batches, max_batches)
        logging.info(f"启用批次限制: 仅处理前 {max_batches} 批")

    # 初始化生成器
    qa_generator = PowerQA_Generator(api_key, api_base)

    # 测试连接
    if not qa_generator.test_connection():
        error_msg = "API连接测试失败,请检查配置"
        logging.error(error_msg)
        raise ConnectionError(error_msg)

    logging.info("API连接测试成功,开始生成问答对...")
    print(f"开始处理 {total_blocks} 个文本块,按{batch_size}个/批分组,共{total_batches}批...")

    # 准备输出数据
    output_data = []
    total_qa = 0

    # 按批次处理文本块
    for batch_idx in tqdm(range(total_batches), desc="生成电力问答对", unit="批"):
        start_idx = batch_idx * batch_size
        end_idx = min((batch_idx + 1) * batch_size, total_blocks)
        batch_blocks = text_blocks[start_idx:end_idx]

        # 合并批内文本块(用分隔符分隔)
        batch_text = "\n---------------\n".join([str(block) for block in batch_blocks])

        try:
            # 处理整个批次的文本
            qa_pairs = qa_generator.generate_qa_pairs(batch_text)

            # 为每个问答对记录来源信息
            for qa in qa_pairs:
                output_data.append({
                    "batch_index": batch_idx + 1,
                    "blocks_range": f"{start_idx + 1}-{end_idx}",
                    "source_blocks": batch_blocks,
                    "question": qa["question"],
                    "answer": qa["answer"]
                })
                total_qa += 1

            # 定期保存进度
            if batch_idx % 2 == 0 and output_data:
                save_intermediate(output_data, output_excel)
                print(f"已处理 {batch_idx + 1}/{total_batches} 批, 生成 {total_qa} 个问答对")
                logging.info(f"进度: {batch_idx + 1}/{total_batches} 批, {total_qa} 问答对")

            time.sleep(1.5)  # 避免速率限制
        except Exception as e:
            logging.error(f"处理批次 {batch_idx + 1} 时出错: {e}")

    # 最终保存
    if output_data:
        save_final_output(output_data, output_excel)
        logging.info(f"处理完成! 共生成 {len(output_data)} 个问答对")
        return len(output_data)
    else:
        logging.warning("未生成任何问答对")
        return 0


def save_intermediate(data, output_path):
    """保存中间结果"""
    temp_df = pd.DataFrame(data)
    # 添加时间戳防止覆盖
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    temp_output = output_path.replace(".xlsx", f"_temp_{timestamp}.xlsx")

    # 简化中间输出(不包含完整来源文本)
    temp_df.drop(columns=['source_blocks'], errors='ignore').to_excel(temp_output, index=False)
    logging.info(f"保存中间结果到: {temp_output}")


def save_final_output(data, output_path):
    """保存最终结果到Excel并添加电力领域分析"""
    if not data:
        logging.warning("没有生成任何问答对,不创建输出文件")
        return

    # 创建DataFrame
    df_output = pd.DataFrame(data)

    # 电力领域特有的后处理
    df_output = power_domain_postprocessing(df_output)

    # 保存到Excel
    df_output.to_excel(output_path, index=False)
    print(f"\n结果已保存到 {output_path}")
    logging.info(f"最终结果保存到 {output_path}, 共 {len(df_output)} 行")

    # 生成分析报告
    generate_analysis_report(df_output, output_path)


def power_domain_postprocessing(df):
    """电力领域特有的后处理"""

    # 添加技术分类
    def classify_question(question):
        question = str(question)
        if any(kw in question for kw in ["原理", "工作", "结构", "功能", "组成"]):
            return "设备原理"
        elif any(kw in question for kw in ["操作", "步骤", "流程", "顺序", "执行"]):
            return "操作规程"
        elif any(kw in question for kw in ["故障", "事故", "问题", "异常", "处理"]):
            return "故障处理"
        elif any(kw in question for kw in ["计算", "公式", "参数", "数值", "系数"]):
            return "参数计算"
        elif any(kw in question for kw in ["安全", "距离", "防护", "措施", "要求"]):
            return "安全规范"
        elif any(kw in question for kw in ["标准", "规范", "规定", "规程", "导则"]):
            return "标准规范"
        return "综合知识"

    df['category'] = df['question'].apply(classify_question)

    # 提取电压等级信息
    def extract_voltage(text):
        if not isinstance(text, str):
            return ""

        voltage_pattern = r"(\d+\.?\d*)\s*(kV|千伏|伏)"
        matches = re.findall(voltage_pattern, text)
        if matches:
            # 去重并排序
            voltages = sorted(set([f"{val}{unit}" for val, unit in matches]),
                              key=lambda x: float(re.findall(r"\d+\.?\d*", x)[0]))
            return ", ".join(voltages)
        return ""

    df['voltage_level'] = df.apply(
        lambda row: extract_voltage(f"{row['question']} {row['answer']}"),
        axis=1
    )

    # 添加技术参数标记
    def contains_technical_params(text):
        if not isinstance(text, str):
            return False
        return any(kw in text for kw in ["kV", "kA", "MW", "MVA", "Hz", "%"])

    df['has_technical_params'] = df['answer'].apply(contains_technical_params)

    # 添加批次信息
    df['batch_info'] = df.apply(
        lambda row: f"批次{row['batch_index']} (块{row['blocks_range']})",
        axis=1
    )

    return df


def generate_analysis_report(df, output_path):
    """生成电力问答对分析报告"""
    report_path = output_path.replace(".xlsx", "_analysis.txt")

    with open(report_path, 'w', encoding='utf-8') as report:
        report.write("电力问答对数据集分析报告\n")
        report.write("=" * 50 + "\n")
        report.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        report.write(f"总问答对数: {len(df)}\n\n")

        # 批次统计
        batch_counts = df['batch_index'].value_counts().sort_index()
        report.write("批次统计:\n")
        for batch, count in batch_counts.items():
            report.write(f"  - 批次 {batch}: {count} 个问答对\n")

        # 分类统计
        report.write("\n问题分类统计:\n")
        category_counts = df['category'].value_counts()
        for category, count in category_counts.items():
            report.write(f"  - {category}: {count} 个 ({count / len(df) * 100:.1f}%)\n")

        # 电压等级分布
        report.write("\n电压等级分布:\n")
        voltage_levels = df[df['voltage_level'] != '']['voltage_level']
        if not voltage_levels.empty:
            voltage_stats = voltage_levels.value_counts().head(10)
            for voltage, count in voltage_stats.items():
                report.write(f"  - {voltage}: {count} 个\n")
        else:
            report.write("  未检测到电压等级信息\n")

        # 技术参数比例
        tech_param_percent = df['has_technical_params'].mean() * 100
        report.write(f"\n包含技术参数的问答比例: {tech_param_percent:.1f}%\n")

        # 示例展示
        report.write("\n优质问答对示例:\n")
        examples = df[df['has_technical_params']].sample(min(5, len(df)), random_state=42)
        for i, (_, row) in enumerate(examples.iterrows(), 1):
            report.write(f"\n示例 {i}:\n")
            report.write(f"批次: {row['batch_index']} (块{row['blocks_range']})\n")
            report.write(f"分类: {row['category']}\n")
            report.write(f"电压等级: {row['voltage_level']}\n")
            report.write(f"问题: {row['question']}\n")
            report.write(f"答案: {row['answer']}\n")

    print(f"分析报告已保存到: {report_path}")
    logging.info(f"生成分析报告: {report_path}")


def main():
    # 命令行参数解析
    parser = argparse.ArgumentParser(description='电力领域问答对生成工具')
    parser.add_argument('--input', type=str, default='chunks_final.xlsx',
                        help='输入Excel文件路径 (默认: power_books.xlsx)')
    parser.add_argument('--output', type=str, default='power_qa_dataset.xlsx',
                        help='输出Excel文件路径 (默认: power_qa_dataset.xlsx)')
    parser.add_argument('--batch_size', type=int, default=10,
                        help='每批处理的文本块数量 (默认: 10)')
    parser.add_argument('--max_batches', type=int, default=None,
                        help='最大处理批次数 (默认: 全部批次)')
    parser.add_argument('--api_key', type=str, default='sk-WVoD66MR',
                        help='OpenAI API密钥')
    parser.add_argument('--api_base', type=str, default='',
                        help='OpenAI API基础URL')

    args = parser.parse_args()

    # 检查文件存在
    if not os.path.exists(args.input):
        print(f"错误: 输入文件 {args.input} 不存在")
        logging.error(f"输入文件不存在: {args.input}")
        exit(1)

    print("=" * 60)
    print("电力领域问答对生成系统")
    print(f"输入文件: {args.input}")
    print(f"输出文件: {args.output}")
    print(f"批处理大小: 每批 {args.batch_size} 个文本块")
    if args.max_batches:
        print(f"处理限制: 前 {args.max_batches} 批")
    print("=" * 60)

    # 处理Excel生成问答对
    try:
        start_time = time.time()
        qa_count = process_excel(
            args.input,
            args.output,
            args.api_key,
            args.api_base,
            batch_size=args.batch_size,
            max_batches=args.max_batches
        )
        elapsed = time.time() - start_time

        if qa_count > 0:
            print(f"\n处理完成! 共生成 {qa_count} 个电力专业问答对")
            print(f"耗时: {elapsed:.2f} 秒 ({elapsed / 60:.2f} 分钟)")
            print(f"结果已保存到 {args.output}")
        else:
            print("\n处理完成,但未生成任何问答对,请检查输入数据和API连接")

    except Exception as e:
        print(f"\n处理失败: {e}")
        logging.exception("处理过程中发生严重错误")


if __name__ == "__main__":
    main()

总结

该代码用简洁的逻辑串起 「文本分块 → LLM 生成 → 规则过滤 → 行业后处理 → 多维报告」 的完整流水线,是电力领域知识提炼的即插即用范例。

按需替换 Prompt、关键词与后处理逻辑,即可在其他垂直行业快速复用,值得深入学习与二次开发。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐