基于大模型自动化生成问答对数据集
本文介绍了一个面向电力领域的问答数据集生成工具,通过OpenAI API将电力技术文本自动转化为结构化QA对。该工具采用批处理架构,包含文本分块、LLM生成、规则过滤、领域后处理等模块,支持电压等级提取、技术参数标记等电力专属功能。
为什么需要提取Q&A?主要有两个用途,一个用途是为了去训练模型,让模型朝着我希望的方向发展;另一个是可以提高RAG的准确率,提高相关性,准确性,使得用户的问题能够更好的匹配查询,避免大模型胡乱生成答案。
目录
前言
提供一个面向电力技术文本的“问答数据集”生成工具,利用 OpenAI 大模型(可接私有 / 代理 API)自动把零散的专业文本切片批量转化为高质量、结构化的 问题–答案 (Q&A)。
典型应用场景:
- 快速构建电力行业知识库 / 向量库。
- 生成 LLM 对电力场景的指令微调数据。
- 自助创建运维、电网培训题库。
一、整体架构
graph TD
A[命令行入口 main()] --> B[process_excel()]
B --> C[读取 Excel<br/>content 列]
C --> D[批量拼接文本 batch_text]
D --> E[PowerQA_Generator.generate_qa_pairs()]
E -->|JSON Q&A| F[验证 + 过滤]
F --> G{中间保存?}
G -->|是| H[save_intermediate()]
G -->|否| I[继续循环]
I --> J[全部结束后<br/>save_final_output()]
J --> K[generate_analysis_report()]
二、依赖与运行环境
- Python ≥3.8
- pandas、tqdm、openai、argparse、logging、re 等标准库/三方库
(requirements.txt 推荐:pandas>=2.0.0, openai>=1.10.0, tqdm>=4.60)
三、核心类 PowerQA_Generator
| 方法 | 作用 | 关键实现 |
|---|---|---|
| __init__() | 建立 OpenAI client 与 Prompt 模板 | 「多文本片段」提示词,嵌入电力子领域要求 |
| generate_qa_pairs() | 与 LLM 交互,拿到 JSON 格式的 Q&A | 检查嵌套 ```json```、多次重试、日志 |
| is_valid_power_qa() | 行业词汇 + 最小长度过滤 | 电力关键词列表近百条,防止跑偏 |
| test_connection() | 预检 API 连通性 | 可避免长时间空跑 |
四、Excel 批处理流程
1. 读取– process_excel()
读取指定 Excel 文件,要求存在 content 列(已切好的文本块 / 子句)。
2. 批次划分
batch_size 决定一次聚合几个文本片段,避免 prompt 超长。
3. 合并片段
同批用 --------------- 分隔,作为单轮 prompt 输入 LLM。
4. 生成与过滤
调用 generate_qa_pairs() 拿到一段 JSON;再与关键词白名单比对过滤噪声。
5. 中间结果落盘
每 2 个批次自动调用 save_intermediate() 生成临时 XLSX,容灾友好。
6. 最终输出
save_final_output() 写成汇总表,并追加行业后处理字段(见下一节)。
五、电力领域专属后处理
函数 power_domain_postprocessing() 会对每条 Q&A 做增值加工:
- 问题 技术分类(设备原理 / 操作规程 / 故障处理 / …)。
- 正则提取 电压等级(如 10 kV、500 kV)。
- 标记答案中是否含有 技术参数(kV、Hz、%…)。
- 衍生字段 batch_info 方便回溯来源。
再由 generate_analysis_report() 输出文字版统计报告,包含:
- 批次、分类分布
- 电压等级 Top N
- 技术参数占比
- 随机抽样 5 条高质量示例
六、命令行用法
# 命令行参数解析
parser = argparse.ArgumentParser(description='电力领域问答对生成工具')
parser.add_argument('--input', type=str, default='chunks_final.xlsx',
help='输入Excel文件路径 (默认: power_books.xlsx)')
parser.add_argument('--output', type=str, default='power_qa_dataset.xlsx',
help='输出Excel文件路径 (默认: power_qa_dataset.xlsx)')
parser.add_argument('--batch_size', type=int, default=10,
help='每批处理的文本块数量 (默认: 10)')
parser.add_argument('--max_batches', type=int, default=None,
help='最大处理批次数 (默认: 全部批次)')
parser.add_argument('--api_key', type=str, default='sk-WVoD66MR',
help='OpenAI API密钥')
parser.add_argument('--api_base', type=str, default='',
help='OpenAI API基础URL')
常用参数说明:
| 参数 | 默认 | 作用 |
|---|---|---|
| --input | kv_store_text_chunks_final.xlsx | 含文本片段的源文件 |
| --output | power_qa_dataset.xlsx | 目标问答 Excel |
| --batch_size | 10 | 每批拼接多少文本片段 |
| --max_batches | None | 只跑前 N 批,调试友好 |
| --api_key / --api_base | 示例值 | 私有化 / 国内代理可替换 |
七、日志与容错
- 全程写入 power_qa_generator.log,包含原始 JSON 响应、防止定位难。
- JSON 解析失败、API 429/500 等自动重试;依旧失败则跳过当前批,流程不中断。
- time.sleep(1.5) 控速;可视情况调整。
八、可扩展点与优化建议
- 并发调度:当前串行调用,配合 asyncio 或多线程 + token 速率窗可提速数倍。
- 关键字池:is_valid_power_qa() 可外置到 YAML/JSON,便于持续更新。
- Prompt 参数化:把子领域列表、问题类型枚举拆为配置文件,支持跨行业复用。
- 向量去重:生成后可计算语义相似度,剔除冗余问答,提升数据质量。
- Web UI:封装为 Gradio / Streamlit,使非技术人员也能上传 Excel 一键生成。
九、完整代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pandas as pd
import json
import time
import re
import os
import logging
import argparse
from openai import OpenAI
from tqdm import tqdm
from datetime import datetime
# 配置日志
logging.basicConfig(
filename='power_qa_generator.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class PowerQA_Generator:
def __init__(self, api_key, api_base, model_name="gpt-4o"):
self.client = OpenAI(api_key=api_key, base_url=api_base)
self.model_name = model_name
# 电力领域优化的提示词模板(支持多文本块)
self.prompt_template = """
您是一位电力系统专家,负责从电力技术文本中创建高质量的问答数据集。请基于下面提供的多个相关文本片段,严格按照内容生成专业、准确的问答对。
## 专业要求:
1. 生成1-10个不同角度的专业问题,特别关注以下电力领域:
- 电力设备技术:变压器、断路器、继电保护等设备的工作原理、技术参数和维护要求
- 电网运行:频率控制、电压调节、稳定性分析、负荷预测等
- 电力安全:操作规程、安全距离、防护措施、事故预防
- 故障处理:短路计算、保护动作、事故分析流程
- 新能源技术:光伏/风电并网技术,储能系统应用
- 电力经济:电价机制、市场交易、成本分析
- 标准规范:国家标准(GB)、行业规范(DL)、技术导则
2. 问题类型应覆盖多个文本片段的内容:
[定义类] 如:什么是差动保护?其工作原理是什么?
[参数类] 如:220kV变压器的空载损耗标准值是多少?
[流程类] 如:倒闸操作的基本步骤和安全注意事项有哪些?
[故障类] 如:发生单相接地故障时有哪些现象?如何处理?
[计算类] 如:如何计算短路电流?需要考虑哪些因素?
[安全类] 如:10kV线路的安全距离是多少?在哪些情况下需要增加距离?
[标准类] 如:GB 50150-2016对变压器交接试验有哪些要求?
3. 答案要求:
- 严格基于文本内容,准确简洁(不超过500字)
- 包含具体技术参数(如电压等级、时间要求、数值范围等)
- 使用专业术语(如:绝缘子、谐波、功率因数、继电保护等)
- 整合多个文本片段的信息形成完整答案
- 避免生成文本中未提及的内容
4. 使用JSON格式输出:{"questions": [{"question": "...", "answer": "..."}, ...]}
## 电力专业文本片段(由多个相关段落组成):
{text_segment}
"""
def generate_qa_pairs(self, text_segment, max_retries=3):
"""生成电力专业问答对(支持多文本块)"""
# prompt = self.prompt_template.format(text_segment=text_segment)
# 使用 replace 而不是 format,避免模板中 JSON 示例里的花括号被误解析
prompt = self.prompt_template.replace("{text_segment}", text_segment)
for attempt in range(max_retries):
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "您是电力系统高级工程师,专注于从多个技术文本中提取和整合知识,只能输出 JSON。"},
{"role": "user", "content": prompt + "\n\n请严格仅输出 JSON 字符串,不要添加解释文字。"}
],
temperature=0.2,
response_format={"type": "json_object"}
)
# 解析JSON响应
json_str = response.choices[0].message.content.strip()
logging.info(f"RAW_RESPONSE: {json_str}")
# 处理可能的 ```json ... ``` 代码块
if json_str.startswith("```"):
# 去掉开头与结尾的```及语言标记
json_str = re.sub(r"^```[a-zA-Z]*\s*|```$", "", json_str, flags=re.S).strip()
# 若仍不是以 { 开头,尝试提取首个 JSON 对象
if not json_str.startswith("{"):
match = re.search(r"\{.*\}", json_str, re.S)
if match:
json_str = match.group(0)
qa_data = json.loads(json_str)
# 验证数据结构
if "questions" not in qa_data or not isinstance(qa_data["questions"], list):
raise ValueError("响应格式错误: 缺少'questions'列表")
# 清理答案并添加电力领域特殊处理
validated_pairs = []
for qa in qa_data["questions"]:
# 验证必要字段
if "question" not in qa or "answer" not in qa:
continue
# 清理文本
qa["question"] = re.sub(r'\s+', ' ', qa["question"]).strip()
qa["answer"] = re.sub(r'\s+', ' ', qa["answer"]).strip()
# 电力领域质量检查
if self.is_valid_power_qa(qa["question"], qa["answer"]):
validated_pairs.append({
"question": qa["question"],
"answer": qa["answer"]
})
return validated_pairs
except (json.JSONDecodeError, KeyError, ValueError) as e:
logging.warning(f"JSON解析错误: {e}, 重试 {attempt + 1}/{max_retries}")
time.sleep(2)
except Exception as e:
logging.error(f"API错误: {e}, 重试 {attempt + 1}/{max_retries}")
time.sleep(5)
logging.error(f"生成失败: {text_segment[:50]}...")
return []
def is_valid_power_qa(self, question, answer):
"""电力领域问答对质量验证"""
# 检查是否包含电力关键词
# power_keywords = [
# "电压", "电流", "变压器", "断路器", "线路", "保护", "接地", "绝缘",
# "调度", "负荷", "短路", "谐波", "功率", "kV", "继电", "配电",
# "变电", "输电", "并网", "频率", "有功", "无功", "电抗", "电纳"
# ]
power_keywords = [
# 基本专业关键词
"电压", "电流", "变压器", "断路器", "线路", "保护", "接地", "绝缘",
"调度", "负荷", "短路", "谐波", "功率", "kV", "继电", "配电",
"变电", "输电", "并网", "频率", "有功", "无功", "电纳",
# 系统与设备结构
"发电机", "变电站", "母线", "馈线", "电缆", "架空线", "三相电", "电流互感器",
"电压互感器", "隔离开关",
# 保护与自动化
"继电保护", "过流保护", "距离保护", "差动保护", "保护继电器",
"SCADA", "ADMS", "微电网", "需求响应",
# 电能质量与参数
"有功功率", "无功功率", "视在功率", "功率因数", "频率偏差",
"谐波畸变", "电涌", "电压暂降", "瞬变", "电涌",
# 电力电子与转换
"整流器", "逆变器", "变频器", "静止无功补偿", "有源滤波器",
"UPS", "储能系统", "BESS",
# 运行与稳定性
"自动发电控制", "自动电压控制", "低电压穿越", "高电压穿越",
"减载", "重合闸", "故障定位", "潮流计算",
# 测量与计量
"电阻", "电抗", "阻抗", "电容", "电感", "监控", "计量",
# 可再生与储能
"光伏", "风电", "水电", "抽水蓄能", "超级电容", "飞轮储能",
# 电机与驱动
"同步电机", "异步电机", "电动机", "变频驱动", "励磁系统",
# 标准与规范
"IEC", "IEEE", "NEMA", "NFPA", "GB/T", "标准",
# 控制理论与电气工程基础
"PID控制", "PLC", "可编程逻辑控制", "电磁场", "电磁感应",
]
has_keyword = any(kw in question or kw in answer for kw in power_keywords)
# 检查答案是否具体
# has_specifics = any(char in answer for char in ["kV", "kA", "MW", "MVA", "Hz", "%", "秒", "分钟", "℃"])
# 检查答案格式是否规范
# has_structured_answer = any(marker in answer for marker in ["1.", "①", "第一", "首先"])
# return has_keyword and has_specifics and has_structured_answer and len(answer) >= 20
return has_keyword and len(answer) >= 10
def test_connection(self):
"""测试API连接"""
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "user", "content": "电力系统中的主要设备有哪些?"}],
temperature=0.0
)
return "变压器" in response.choices[0].message.content
except Exception as e:
logging.error(f"连接测试失败: {e}")
return False
def process_excel(input_path, output_excel, api_key, api_base, batch_size=10, max_batches=None):
"""
处理Excel文件生成问答对并保存到Excel
Args:
input_path: 输入Excel路径
output_excel: 输出Excel路径
api_key: API密钥
api_base: API基础URL
batch_size: 每次处理的文本块数量
max_batches: 最大处理批次数
"""
# 读取Excel文件
try:
df = pd.read_excel(input_path)
logging.info(f"成功读取文件: {input_path}, 共 {len(df)} 行")
except Exception as e:
logging.error(f"读取Excel失败: {e}")
raise
if 'content' not in df.columns:
error_msg = "Excel文件中缺少'content'列"
logging.error(error_msg)
raise ValueError(error_msg)
# 获取文本块(已切分好的)
text_blocks = df['content'].dropna().tolist()
total_blocks = len(text_blocks)
# 计算总批次数
total_batches = (total_blocks + batch_size - 1) // batch_size
# 应用批次数限制
if max_batches is not None and max_batches > 0:
total_batches = min(total_batches, max_batches)
logging.info(f"启用批次限制: 仅处理前 {max_batches} 批")
# 初始化生成器
qa_generator = PowerQA_Generator(api_key, api_base)
# 测试连接
if not qa_generator.test_connection():
error_msg = "API连接测试失败,请检查配置"
logging.error(error_msg)
raise ConnectionError(error_msg)
logging.info("API连接测试成功,开始生成问答对...")
print(f"开始处理 {total_blocks} 个文本块,按{batch_size}个/批分组,共{total_batches}批...")
# 准备输出数据
output_data = []
total_qa = 0
# 按批次处理文本块
for batch_idx in tqdm(range(total_batches), desc="生成电力问答对", unit="批"):
start_idx = batch_idx * batch_size
end_idx = min((batch_idx + 1) * batch_size, total_blocks)
batch_blocks = text_blocks[start_idx:end_idx]
# 合并批内文本块(用分隔符分隔)
batch_text = "\n---------------\n".join([str(block) for block in batch_blocks])
try:
# 处理整个批次的文本
qa_pairs = qa_generator.generate_qa_pairs(batch_text)
# 为每个问答对记录来源信息
for qa in qa_pairs:
output_data.append({
"batch_index": batch_idx + 1,
"blocks_range": f"{start_idx + 1}-{end_idx}",
"source_blocks": batch_blocks,
"question": qa["question"],
"answer": qa["answer"]
})
total_qa += 1
# 定期保存进度
if batch_idx % 2 == 0 and output_data:
save_intermediate(output_data, output_excel)
print(f"已处理 {batch_idx + 1}/{total_batches} 批, 生成 {total_qa} 个问答对")
logging.info(f"进度: {batch_idx + 1}/{total_batches} 批, {total_qa} 问答对")
time.sleep(1.5) # 避免速率限制
except Exception as e:
logging.error(f"处理批次 {batch_idx + 1} 时出错: {e}")
# 最终保存
if output_data:
save_final_output(output_data, output_excel)
logging.info(f"处理完成! 共生成 {len(output_data)} 个问答对")
return len(output_data)
else:
logging.warning("未生成任何问答对")
return 0
def save_intermediate(data, output_path):
"""保存中间结果"""
temp_df = pd.DataFrame(data)
# 添加时间戳防止覆盖
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_output = output_path.replace(".xlsx", f"_temp_{timestamp}.xlsx")
# 简化中间输出(不包含完整来源文本)
temp_df.drop(columns=['source_blocks'], errors='ignore').to_excel(temp_output, index=False)
logging.info(f"保存中间结果到: {temp_output}")
def save_final_output(data, output_path):
"""保存最终结果到Excel并添加电力领域分析"""
if not data:
logging.warning("没有生成任何问答对,不创建输出文件")
return
# 创建DataFrame
df_output = pd.DataFrame(data)
# 电力领域特有的后处理
df_output = power_domain_postprocessing(df_output)
# 保存到Excel
df_output.to_excel(output_path, index=False)
print(f"\n结果已保存到 {output_path}")
logging.info(f"最终结果保存到 {output_path}, 共 {len(df_output)} 行")
# 生成分析报告
generate_analysis_report(df_output, output_path)
def power_domain_postprocessing(df):
"""电力领域特有的后处理"""
# 添加技术分类
def classify_question(question):
question = str(question)
if any(kw in question for kw in ["原理", "工作", "结构", "功能", "组成"]):
return "设备原理"
elif any(kw in question for kw in ["操作", "步骤", "流程", "顺序", "执行"]):
return "操作规程"
elif any(kw in question for kw in ["故障", "事故", "问题", "异常", "处理"]):
return "故障处理"
elif any(kw in question for kw in ["计算", "公式", "参数", "数值", "系数"]):
return "参数计算"
elif any(kw in question for kw in ["安全", "距离", "防护", "措施", "要求"]):
return "安全规范"
elif any(kw in question for kw in ["标准", "规范", "规定", "规程", "导则"]):
return "标准规范"
return "综合知识"
df['category'] = df['question'].apply(classify_question)
# 提取电压等级信息
def extract_voltage(text):
if not isinstance(text, str):
return ""
voltage_pattern = r"(\d+\.?\d*)\s*(kV|千伏|伏)"
matches = re.findall(voltage_pattern, text)
if matches:
# 去重并排序
voltages = sorted(set([f"{val}{unit}" for val, unit in matches]),
key=lambda x: float(re.findall(r"\d+\.?\d*", x)[0]))
return ", ".join(voltages)
return ""
df['voltage_level'] = df.apply(
lambda row: extract_voltage(f"{row['question']} {row['answer']}"),
axis=1
)
# 添加技术参数标记
def contains_technical_params(text):
if not isinstance(text, str):
return False
return any(kw in text for kw in ["kV", "kA", "MW", "MVA", "Hz", "%"])
df['has_technical_params'] = df['answer'].apply(contains_technical_params)
# 添加批次信息
df['batch_info'] = df.apply(
lambda row: f"批次{row['batch_index']} (块{row['blocks_range']})",
axis=1
)
return df
def generate_analysis_report(df, output_path):
"""生成电力问答对分析报告"""
report_path = output_path.replace(".xlsx", "_analysis.txt")
with open(report_path, 'w', encoding='utf-8') as report:
report.write("电力问答对数据集分析报告\n")
report.write("=" * 50 + "\n")
report.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
report.write(f"总问答对数: {len(df)}\n\n")
# 批次统计
batch_counts = df['batch_index'].value_counts().sort_index()
report.write("批次统计:\n")
for batch, count in batch_counts.items():
report.write(f" - 批次 {batch}: {count} 个问答对\n")
# 分类统计
report.write("\n问题分类统计:\n")
category_counts = df['category'].value_counts()
for category, count in category_counts.items():
report.write(f" - {category}: {count} 个 ({count / len(df) * 100:.1f}%)\n")
# 电压等级分布
report.write("\n电压等级分布:\n")
voltage_levels = df[df['voltage_level'] != '']['voltage_level']
if not voltage_levels.empty:
voltage_stats = voltage_levels.value_counts().head(10)
for voltage, count in voltage_stats.items():
report.write(f" - {voltage}: {count} 个\n")
else:
report.write(" 未检测到电压等级信息\n")
# 技术参数比例
tech_param_percent = df['has_technical_params'].mean() * 100
report.write(f"\n包含技术参数的问答比例: {tech_param_percent:.1f}%\n")
# 示例展示
report.write("\n优质问答对示例:\n")
examples = df[df['has_technical_params']].sample(min(5, len(df)), random_state=42)
for i, (_, row) in enumerate(examples.iterrows(), 1):
report.write(f"\n示例 {i}:\n")
report.write(f"批次: {row['batch_index']} (块{row['blocks_range']})\n")
report.write(f"分类: {row['category']}\n")
report.write(f"电压等级: {row['voltage_level']}\n")
report.write(f"问题: {row['question']}\n")
report.write(f"答案: {row['answer']}\n")
print(f"分析报告已保存到: {report_path}")
logging.info(f"生成分析报告: {report_path}")
def main():
# 命令行参数解析
parser = argparse.ArgumentParser(description='电力领域问答对生成工具')
parser.add_argument('--input', type=str, default='chunks_final.xlsx',
help='输入Excel文件路径 (默认: power_books.xlsx)')
parser.add_argument('--output', type=str, default='power_qa_dataset.xlsx',
help='输出Excel文件路径 (默认: power_qa_dataset.xlsx)')
parser.add_argument('--batch_size', type=int, default=10,
help='每批处理的文本块数量 (默认: 10)')
parser.add_argument('--max_batches', type=int, default=None,
help='最大处理批次数 (默认: 全部批次)')
parser.add_argument('--api_key', type=str, default='sk-WVoD66MR',
help='OpenAI API密钥')
parser.add_argument('--api_base', type=str, default='',
help='OpenAI API基础URL')
args = parser.parse_args()
# 检查文件存在
if not os.path.exists(args.input):
print(f"错误: 输入文件 {args.input} 不存在")
logging.error(f"输入文件不存在: {args.input}")
exit(1)
print("=" * 60)
print("电力领域问答对生成系统")
print(f"输入文件: {args.input}")
print(f"输出文件: {args.output}")
print(f"批处理大小: 每批 {args.batch_size} 个文本块")
if args.max_batches:
print(f"处理限制: 前 {args.max_batches} 批")
print("=" * 60)
# 处理Excel生成问答对
try:
start_time = time.time()
qa_count = process_excel(
args.input,
args.output,
args.api_key,
args.api_base,
batch_size=args.batch_size,
max_batches=args.max_batches
)
elapsed = time.time() - start_time
if qa_count > 0:
print(f"\n处理完成! 共生成 {qa_count} 个电力专业问答对")
print(f"耗时: {elapsed:.2f} 秒 ({elapsed / 60:.2f} 分钟)")
print(f"结果已保存到 {args.output}")
else:
print("\n处理完成,但未生成任何问答对,请检查输入数据和API连接")
except Exception as e:
print(f"\n处理失败: {e}")
logging.exception("处理过程中发生严重错误")
if __name__ == "__main__":
main()
总结
该代码用简洁的逻辑串起 「文本分块 → LLM 生成 → 规则过滤 → 行业后处理 → 多维报告」 的完整流水线,是电力领域知识提炼的即插即用范例。
按需替换 Prompt、关键词与后处理逻辑,即可在其他垂直行业快速复用,值得深入学习与二次开发。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)