【第二章】大模型预训练全解析:定义、数据处理、流程及多阶段训练逻辑
本文系统解析大模型预训练全流程:定义:通过大规模无标注语料的自监督学习(Next Token Prediction),使模型掌握通用知识和模式。数据处理:涵盖数据来源(网络/书籍)、清洗(去重/去噪)、分词(BPE等)及多阶段(PT/SFT/RM/PPO)的数据格式差异。训练逻辑:PT阶段:整段文本损失计算SFT阶段:仅计算Response部分的损失(通过-100屏蔽Prompt)统一使用交叉熵损

接上一章《【第一章】大模型预训练全解析:定义、数据处理、流程及多阶段训练逻辑》
算法与并行策略
两阶段优化
1. 预训练:
2. 微调: with initial
prompt 工程
无需改动参数,通过上下文,让模型在生成时自适应:
下面的代码片段来自LLaMA-Efficient-Tuning代码库,其中preprocess.py文件里面定义了不同阶段数据处理的方式。
1). 根据不同训练阶段对数据集进行预处理
def preprocess_dataset(
# dataset:输入数据集,支持常规或流式数据集
dataset: Union["Dataset", "IterableDataset"],
# tokenizer:分词器,用于将文本分成token
tokenizer: "PreTrainedTokenizer",
# args 是 arguments(参数)的缩写,通常表示一组配置参数
# DataArguments:配置数据加载和预处理参数
data_args: "DataArguments",
# Seq2SeqTrainingArguments:序列到序列模型(如 T5、BART)的训练参数配置类
training_args: "Seq2SeqTrainingArguments", # 使用引号,Python 会在运行时解析类型,允许你提前使用未导入的类型,避免循环依赖问题
# stage:训练阶段,可选值为 "pt"(预训练)、"sft"(监督微调)、"rm"(奖励模型)、"ppo"(策略优化)
stage: Literal["pt", "sft", "rm", "ppo"]
) -> Union["Dataset", "IterableDataset"]:
2). 数据结构准备
column_names = list(next(iter(dataset)).keys())
template = get_template_and_fix_tokenizer(data_args.template, tokenizer)
- column_names = list(next(iter(dataset)).keys())
功能
获取数据集的列名(即字段名称),通常用于了解数据集的结构。
执行步骤
iter(dataset):将数据集转换为迭代器,以便逐行访问数据。
next(iter(dataset)):从迭代器中取出第一个样本(即数据集的第一行)。
next(...).keys():获取该样本的所有键(key),即列名。
list(...):将键转换为列表格式,便于后续操作。
示例
假设数据集的第一行为 {'text': 'Hello', 'label': 1},则 column_names 为 ['text', 'label']。
- template = get_template_and_fix_tokenizer(data_args.template, tokenizer)
功能
获取模板并修复分词器,确保输入文本能正确被模型处理
参数解析
data_args.template:配置文件中定义的模板字符串,用于格式化输入数据
例如:"问题:{text}\n答案:{label}"
get_template_and_fix_tokenizer操作:
解析模板:提取模板中的特殊标记(如 {text}、{label}),用于后续数据填充。 修复分词器: 为模板中的特殊符号(如 \n、自定义分隔符)添加特殊 token。确保分词器能正确处理模板中的占位符(如 {text}) 返回处理后的模板:可能是一个函数或对象,用于后续数据格式化
示例
# 假设dataset是一个包含文本和标签的数据集
dataset = [
{'text': '今天天气如何', 'label': '晴天'},
{'text': '明天有雨吗', 'label': '有雨'}
]
# 步骤1:获取列名
column_names = list(next(iter(dataset)).keys())
# 输出:['text', 'label']
# 步骤2:获取模板并修复分词器
# 假设template配置为 "问题:{text}\n答案:{label}"
template = get_template_and_fix_tokenizer(data_args.template, tokenizer)
# 后续使用template格式化数据
formatted_text = template(dataset[0])
# 输出:"问题:今天天气如何\n答案:晴天"
3).数据生成器
def construct_example(examples: Dict[str, List[Any]]) -> Generator[Any, None, None]:
for i in range(len(examples["prompt"])):
query = examples["prompt"][i]
if "query" in examples and examples["query"][i]:
query = query + "\n" + examples["query"][i]
response = examples["response"][i] if "response" in examples else None
history = examples["history"][i] if "history" in examples else None
system = examples["system"][i] if "system" in examples else None
yield query, response, history, system
示例
示例输入数据
假设输入的 examples 是一个包含4个字段的字典,对应2个样本:
examples = {
"prompt": ["你是谁", "今天星期几"], # 必选字段,每个样本的基础提示
"query": ["请用简洁语言回答", ""], # 可选字段,附加查询(第二个样本为空)
"response": ["我是AI助手豆包", "今天星期五"], # 响应(标签)
"history": [None, [("昨天星期几", "昨天星期四")]], # 对话历史(第二个样本有历史记录)
"system": ["通用模式", None] # 系统提示(第二个样本无系统提示)
}
逐行代码执行过程与输出
** 函数定义与参数**
def construct_example(examples: Dict[str, List[Any]]) -> Generator[Any, None, None]:
- 输入:examples 是上述字典,包含2个样本的各字段列表。
- 输出:生成器,逐个返回处理后的样本元组。
** 循环遍历样本(i=0 时处理第一个样本)
for i in range(len(examples["prompt"])): # len(examples["prompt"]) = 2,循环i=0和i=1
- **当前 i=0:处理第一个样本(prompt 为“你是谁”)。
** 提取并处理 query**
query = examples["prompt"][i] # examples["prompt"][0] = "你是谁"
if "query" in examples and examples["query"][i]: # examples["query"][0] = "请用简洁语言回答"(非空)
query = query + "\n" + examples["query"][i] # 拼接后:"你是谁\n请用简洁语言回答"
- 处理后 query:"你是谁\n请用简洁语言回答"。
** 提取可选字段(response, history, system)
response = examples["response"][i] if "response" in examples else None # examples["response"][0] = "我是AI助手豆包"
history = examples["history"][i] if "history" in examples else None # examples["history"][0] = None
system = examples["system"][i] if "system" in examples else None # examples["system"][0] = "通用模式"
- **response:"我是AI助手豆包"
- history:None
- system:"通用模式"
生成第一个样本
yield query, response, history, system
- 输出元组:("你是谁\n请用简洁语言回答", "我是AI助手豆包", None, "通用模式")
循环再次执行(i=1 时处理第二个样本)
# i=1,处理第二个样本(prompt为"今天星期几")
query = examples["prompt"][1] # "今天星期几"
if "query" in examples and examples["query"][1]: # examples["query"][1] = ""(空字符串,条件不成立)
# 不执行拼接
response = examples["response"][1] # "今天星期五"
history = examples["history"][1] # [("昨天星期几", "昨天星期四")]
system = examples["system"][1] # None
yield query, response, history, system
- 处理后
query:保持原值"今天星期几"。 - 输出元组:
("今天星期几", "今天星期五", [("昨天星期几", "昨天星期四")], None)
完整输出结果
通过生成器迭代输出的两个样本为:
-
("你是谁\n请用简洁语言回答", "我是AI助手豆包", None, "通用模式") -
("今天星期几", "今天星期五", [("昨天星期几", "昨天星期四")], None)
代码作用总结
| 代码行 | 作用描述 |
|---|---|
len(examples["prompt"]) |
确定样本总数(通过必选字段 prompt 的长度)。 |
query = examples["prompt"][i] |
提取每个样本的基础提示文本。 |
拼接 query |
若存在附加查询字段 query 且非空,将其与基础提示用换行符拼接,丰富输入。 |
| 条件提取可选字段 | 处理 response、history、system 字段,不存在时设为 None。 |
yield 生成元组 |
返回处理后的样本,包含输入查询、响应、历史对话和系统提示。 |
4). 预训练数据处理(PT)
def preprocess_pretrain_dataset(examples: Dict[str, List[Any]]) -> Dict[str, Any]:
tokenized_examples = tokenizer(examples["prompt"], add_special_tokens=False)
concatenated_examples = {k: list(chain(*tokenized_examples[k])) for k in tokenized_examples.keys()}
total_length = (len(concatenated_examples[list(concatenated_examples.keys())[0]]) // block_size) * block_size
result = {
k: [t[i: i + block_size] for i in range(0, total_length, block_size)]
for k, t in concatenated_examples.items()
}
return result
示例输入数据
假设输入的examples包含2个样本,tokenizer将文本转换为token ID,block_size=4:
examples = {
"prompt": ["Hello world", "How are you today"]
}
# 假设tokenizer的分词结果(简化示例):
# "Hello world" → [101, 102]
# "How are you today" → [103, 104, 105, 106]
逐行代码执行过程与输出
分词处理
tokenized_examples = tokenizer(examples["prompt"], add_special_tokens=False)
- 分词结果(假设):
tokenized_examples = {
"input_ids": [[101, 102], [103, 104, 105, 106]],
"attention_mask": [[1, 1], [1, 1, 1, 1]]
}
拼接所有样本的tokens
concatenated_examples = {k: list(chain(*tokenized_examples[k])) for k in tokenized_examples.keys()}
- 拼接后:
concatenated_examples = {
"input_ids": [101, 102, 103, 104, 105, 106],
"attention_mask": [1, 1, 1, 1, 1, 1]
}
计算总长度并截断为block_size的整数倍
total_length = (len(concatenated_examples["input_ids"]) // block_size) * block_size
# len=6,block_size=4 → total_length = (6//4)*4 = 4
- 结果:total_length = 4(丢弃剩余的2个token)。
按block_size切分tokens
result = {
k: [t[i: i + block_size] for i in range(0, total_length, block_size)]
for k, t in concatenated_examples.items()
}
- 切分后:
result = {
"input_ids": [[101, 102, 103, 104]], # 丢弃[105, 106]
"attention_mask": [[1, 1, 1, 1]] # 丢弃[1, 1]
}
完整输出结果
{
"input_ids": [[101, 102, 103, 104]],
"attention_mask": [[1, 1, 1, 1]]
}
代码作用总结
| 代码行 | 作用描述 |
|---|---|
tokenizer(...) |
将所有样本的文本转换为token ID(不添加特殊token)。 |
chain(*tokenized_examples[k]) |
将所有样本的tokens拼接成一个长列表(用于连续训练)。 |
total_length计算 |
截断tokens总长度为block_size的整数倍,确保能均匀切分。 |
| 按块切分 |
将长tokens列表切分为固定长度的块(每个块长度=block_size),丢弃不足的部分。 |
后面继续更新 大模型预训练第二章
如果您认为博文还不错,请帮忙点赞、收藏、关注。您的反馈是我的原动力
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)