基于SWIFT框架的预训练微调和推理实战指南之代码详解
现代大型语言模型(LLM)建立在深度学习、自然语言处理和分布式计算的交叉领域。核心架构Transformer的自注意力机制实现了对长序列数据的并行处理,为处理复杂语言任务奠定了基础。
·
从零开始掌握大模型全流程:基于SWIFT框架的预训练、微调与推理实战指南
1. 大模型技术体系全景概览
1.1 现代大模型技术栈
现代大型语言模型(LLM)建立在深度学习、自然语言处理和分布式计算的交叉领域。核心架构Transformer的自注意力机制实现了对长序列数据的并行处理,为处理复杂语言任务奠定了基础。
关键技术分层:
- 基础设施层:分布式训练框架(PyTorch、DeepSpeed)、混合精度训练
- 核心模型层:Transformer架构、注意力变体(Flash Attention等)
- 效率优化层:参数高效微调(LoRA、QLoRA)、量化技术
- 应用工具层:推理框架、部署工具链
1.2 SWIFT框架定位与优势
SWIFT(Scalable lightWeight Infrastructure for Fine-Tuning)是阿里云推出的开源大模型轻量级训练与推理框架。其核心价值在于:
# SWIFT核心设计理念
1. 统一API设计 - 支持HuggingFace、ModelScope模型一键接入
2. 高效微调集成 - 内置LoRA、QLoRA、Adapter等多种PEFT方法
3. 训练加速优化 - 支持梯度检查点、混合精度、ZeRO优化
4. 全流程覆盖 - 从数据预处理到模型部署的完整pipeline
2. 预训练:从零构建语言理解基础
2.1 预训练核心原理
2.1.1 自监督学习范式
预训练采用自监督学习,核心任务是掩码语言建模(MLM)和下一句预测(NLP)。模型通过大量无标签文本学习语言的统计规律和语义表示。
数学原理:
对于输入序列X = [x₁, x₂, ..., xₙ],
MLM目标:最大化 P(x_masked | x_context)
其中15%的token被随机替换为:
- 80% [MASK]
- 10% 随机token
- 10% 原token
2.1.2 Transformer架构深度解析
# 简化版Transformer编码器层
import torch
import torch.nn as nn
import math
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model=768, n_heads=12, dim_feedforward=3072):
super().__init__()
# 多头注意力机制
self.self_attn = nn.MultiheadAttention(d_model, n_heads)
# 前馈网络
self.ffn = nn.Sequential(
nn.Linear(d_model, dim_feedforward),
nn.GELU(), # 常用激活函数
nn.Linear(dim_feedforward, d_model)
)
# 层归一化
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self, x, attention_mask=None):
# 残差连接 + 层归一化
attn_output, _ = self.self_attn(
x, x, x,
attn_mask=attention_mask
)
x = self.norm1(x + attn_output)
# 前馈网络
ffn_output = self.ffn(x)
x = self.norm2(x + ffn_output)
return x
2.2 预训练实战:从数据到模型
2.2.1 数据预处理Pipeline
from swift import SwiftModel
from datasets import Dataset
import json
class PretrainDataProcessor:
def __init__(self, tokenizer, max_length=2048):
self.tokenizer = tokenizer
self.max_length = max_length
def process_text(self, example):
"""文本清洗与格式化"""
text = example['text']
# 1. 去除特殊字符
text = self._clean_text(text)
# 2. 分句处理
sentences = self._split_into_sentences(text)
# 3. 动态掩码生成
tokens = self.tokenizer.encode(text, truncation=False)
masked_tokens, labels = self._create_mlm_mask(tokens)
return {
'input_ids': masked_tokens[:self.max_length],
'attention_mask': [1] * min(len(masked_tokens), self.max_length),
'labels': labels[:self.max_length]
}
def _create_mlm_mask(self, tokens):
"""实现BERT风格的动态掩码"""
import random
labels = [-100] * len(tokens) # -100在交叉熵中会被忽略
masked_tokens = tokens.copy()
# 选择掩码位置
mask_indices = random.sample(
range(1, len(tokens)-1), # 跳过特殊token
int(0.15 * len(tokens))
)
for idx in mask_indices:
# 80%概率替换为[MASK]
if random.random() < 0.8:
masked_tokens[idx] = self.tokenizer.mask_token_id
# 10%概率替换为随机token
elif random.random() < 0.5:
masked_tokens[idx] = random.randint(100, len(self.tokenizer)-1)
# 10%概率保持不变
labels[idx] = tokens[idx]
return masked_tokens, labels
2.2.2 分布式训练配置
# config/pretrain_config.yaml
trainer:
num_train_epochs: 10
per_device_train_batch_size: 32
gradient_accumulation_steps: 4
learning_rate: 1e-4
warmup_steps: 2000
weight_decay: 0.01
fp16: true # 混合精度训练
model:
hidden_size: 768
num_hidden_layers: 12
num_attention_heads: 12
intermediate_size: 3072
max_position_embeddings: 2048
deepspeed: # DeepSpeed Zero-2配置
zero_optimization:
stage: 2
offload_optimizer:
device: cpu
allgather_partitions: true
reduce_scatter: true
2.2.3 训练脚本示例
# scripts/run_pretrain.py
from swift import SwiftModel, SwiftTrainer
from transformers import AutoModelForMaskedLM, TrainingArguments
import deepspeed
def run_pretraining():
# 1. 加载模型和分词器
model = AutoModelForMaskedLM.from_pretrained(
"bert-base-uncased",
cache_dir="./models"
)
# 2. 初始化SWIFT封装
swift_model = SwiftModel(model)
# 3. 配置训练参数
training_args = TrainingArguments(
output_dir="./output",
num_train_epochs=10,
per_device_train_batch_size=32,
gradient_accumulation_steps=4,
learning_rate=1e-4,
fp16=True,
deepspeed="./config/deepspeed_config.json",
logging_steps=100,
save_steps=1000,
evaluation_strategy="steps",
eval_steps=500
)
# 4. 创建训练器
trainer = SwiftTrainer(
model=swift_model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
data_collator=data_collator,
)
# 5. 开始训练
trainer.train()
# 6. 保存最终模型
trainer.save_model("./final_model")
3. 高效微调:让大模型适配特定任务
3.1 微调方法原理对比
3.1.1 全参数微调 vs 参数高效微调
全参数微调 (Full Fine-tuning):
优点:性能上限高,充分适配下游任务
缺点:计算成本高,存储开销大,容易过拟合
参数高效微调 (PEFT):
优点:计算成本低,存储高效,避免灾难性遗忘
缺点:性能可能略低于全参数微调
3.1.2 LoRA原理详解
LoRA(Low-Rank Adaptation)的核心思想是:大模型的权重更新具有低秩特性,可以用低秩分解近似。
数学表示:
原始前向传播:h = W₀x
LoRA更新:h = W₀x + ΔWx = W₀x + BAx
其中:
W₀ ∈ ℝ^{d×k} (预训练权重,冻结)
B ∈ ℝ^{d×r} (低秩矩阵,可训练)
A ∈ ℝ^{r×k} (低秩矩阵,可训练)
r ≪ min(d,k) (秩远小于原维度)
# LoRA实现原理
class LoRALayer(nn.Module):
def __init__(self, base_layer, rank=8, alpha=16, dropout=0.1):
super().__init__()
self.base_layer = base_layer # 原始层,冻结
# LoRA参数
d, k = base_layer.weight.shape
self.lora_A = nn.Parameter(torch.zeros(rank, k))
self.lora_B = nn.Parameter(torch.zeros(d, rank))
self.scaling = alpha / rank
self.dropout = nn.Dropout(dropout)
# 初始化
nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
nn.init.zeros_(self.lora_B)
def forward(self, x):
# 原始前向传播
base_output = self.base_layer(x)
# LoRA适配
lora_output = (self.dropout(x) @ self.lora_A.T @ self.lora_B.T) * self.scaling
return base_output + lora_output
3.2 SWIFT微调实战
3.2.1 指令微调完整流程
# scripts/finetune_lora.py
from swift import Swift, LoRAConfig, SwiftModel
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from datasets import load_dataset
def instruction_finetuning():
# 1. 加载基础模型(以Qwen-7B为例)
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-7B",
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
)
tokenizer = AutoTokenizer.from_pretrained(
"Qwen/Qwen-7B",
trust_remote_code=True
)
tokenizer.pad_token = tokenizer.eos_token
# 2. 配置LoRA参数
lora_config = LoRAConfig(
r=16, # 秩大小
lora_alpha=32, # 缩放因子
target_modules=["q_proj", "v_proj", "k_proj", "o_proj"], # 目标模块
lora_dropout=0.1,
bias="none"
)
# 3. 创建SWIFT模型
model = Swift.prepare_model(model, lora_config)
# 4. 准备指令数据
def format_instruction(example):
prompt = f"<|im_start|>system\n你是一个有帮助的助手<|im_end|>\n"
prompt += f"<|im_start|>user\n{example['instruction']}<|im_end|>\n"
prompt += f"<|im_start|>assistant\n{example['output']}<|im_end|>"
encoding = tokenizer(
prompt,
truncation=True,
max_length=512,
padding="max_length"
)
# 创建标签(只计算assistant部分的loss)
labels = encoding["input_ids"].copy()
# 将非assistant部分设置为-100(忽略loss)
user_turn = prompt.find("<|im_start|>assistant")
user_input_ids = tokenizer.encode(prompt[:user_turn])
labels[:len(user_input_ids)] = [-100] * len(user_input_ids)
encoding["labels"] = labels
return encoding
dataset = load_dataset("Alpaca-GPT4-zh", split="train")
dataset = dataset.map(format_instruction, batched=False)
# 5. 训练参数配置
training_args = TrainingArguments(
output_dir="./lora_output",
num_train_epochs=3,
per_device_train_batch_size=4,
gradient_accumulation_steps=8,
learning_rate=2e-4,
fp16=True,
logging_steps=10,
save_steps=100,
save_total_limit=3,
remove_unused_columns=False,
push_to_hub=False,
report_to="none"
)
# 6. 创建训练器
from transformers import Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset,
data_collator=lambda data: {
'input_ids': torch.stack([torch.tensor(d['input_ids']) for d in data]),
'attention_mask': torch.stack([torch.tensor(d['attention_mask']) for d in data]),
'labels': torch.stack([torch.tensor(d['labels']) for d in data])
}
)
# 7. 开始训练
trainer.train()
# 8. 保存LoRA权重
model.save_pretrained("./lora_weights")
3.2.2 QLoRA:内存优化的微调
# QLoRA配置(在LoRA基础上添加量化)
from swift import QuantizationConfig, QLoRAConfig
def setup_qlora():
# 配置4-bit量化
quantization_config = QuantizationConfig(
quantization_bit=4, # 4-bit量化
quantization_method="nf4", # NormalFloat4量化
bnb_4bit_compute_dtype=torch.float16
)
# QLoRA配置
qlora_config = QLoRAConfig(
r=64,
lora_alpha=16,
target_modules=["q_proj", "v_proj"],
quantization_config=quantization_config
)
# 加载模型并应用QLoRA
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-14B",
quantization_config=quantization_config,
device_map="auto",
torch_dtype=torch.float16
)
model = Swift.prepare_model(model, qlora_config)
return model
4. 推理与采样:生成优质文本的关键技术
4.1 推理过程深度解析
4.1.1 自回归生成过程
class AutoregressiveGenerator:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def generate(self, prompt, max_length=100, **kwargs):
# 编码输入
input_ids = self.tokenizer.encode(prompt, return_tensors="pt")
# 自回归生成循环
generated = input_ids
for _ in range(max_length):
# 前向传播
with torch.no_grad():
outputs = self.model(generated)
next_token_logits = outputs.logits[:, -1, :]
# 采样策略
next_token_id = self._sample_next_token(
next_token_logits,
**kwargs
)
# 拼接新token
generated = torch.cat([
generated,
next_token_id.unsqueeze(0)
], dim=-1)
# 停止条件检查
if next_token_id == self.tokenizer.eos_token_id:
break
return self.tokenizer.decode(generated[0])
def _sample_next_token(self, logits, temperature=1.0, top_k=50, top_p=0.9):
"""多种采样策略实现"""
# 温度调节
logits = logits / temperature
# Top-k过滤
if top_k > 0:
indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None]
logits[indices_to_remove] = -float('Inf')
# Top-p(核采样)
if top_p < 1.0:
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
cumulative_probs = torch.cumsum(
F.softmax(sorted_logits, dim=-1),
dim=-1
)
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
indices_to_remove = sorted_indices_to_remove.scatter(
1, sorted_indices, sorted_indices_to_remove
)
logits[indices_to_remove] = -float('Inf')
# 概率采样
probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
return next_token.squeeze()
4.2 采样策略详解
4.2.1 贪婪搜索 vs 集束搜索
def beam_search_decoding(model, input_ids, beam_width=4, max_length=50):
"""集束搜索实现"""
# 初始化集束
beams = [{
'sequence': input_ids,
'score': 0.0,
'finished': False
}]
for step in range(max_length):
candidates = []
for beam in beams:
if beam['finished']:
candidates.append(beam)
continue
# 获取当前序列的logits
with torch.no_grad():
outputs = model(beam['sequence'])
next_token_logits = outputs.logits[:, -1, :]
next_token_probs = F.log_softmax(next_token_logits, dim=-1)
# 选择top-k候选
topk_probs, topk_indices = torch.topk(next_token_probs, beam_width)
for i in range(beam_width):
new_sequence = torch.cat([
beam['sequence'],
topk_indices[:, i:i+1]
], dim=-1)
new_score = beam['score'] + topk_probs[:, i].item()
new_beam = {
'sequence': new_sequence,
'score': new_score,
'finished': (topk_indices[:, i] == tokenizer.eos_token_id).item()
}
candidates.append(new_beam)
# 选择得分最高的beam_width个序列
candidates.sort(key=lambda x: x['score'], reverse=True)
beams = candidates[:beam_width]
# 检查是否所有序列都结束了
if all(beam['finished'] for beam in beams):
break
# 返回最佳序列
best_beam = max(beams, key=lambda x: x['score'])
return best_beam['sequence']
4.2.2 温度采样与多样性控制
def controlled_sampling(logits, temperature=1.0, repetition_penalty=1.2,
presence_penalty=0.0, frequency_penalty=0.0,
previous_tokens=None):
"""带惩罚机制的采样"""
# 温度调节
logits = logits / max(temperature, 1e-8)
# 重复惩罚
if repetition_penalty != 1.0 and previous_tokens is not None:
for token in set(previous_tokens):
if logits[token] < 0:
logits[token] *= repetition_penalty
else:
logits[token] /= repetition_penalty
# 存在惩罚和频率惩罚
if (presence_penalty != 0.0 or frequency_penalty != 0.0) and previous_tokens is not None:
token_counts = Counter(previous_tokens)
for token, count in token_counts.items():
penalty = presence_penalty + count * frequency_penalty
logits[token] -= penalty
return logits
4.3 SWIFT推理实战
4.3.1 模型加载与推理优化
# scripts/inference_optimized.py
import torch
from swift import SwiftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import vllm # 高性能推理引擎
class OptimizedInference:
def __init__(self, model_path, lora_path=None, use_vllm=False):
self.use_vllm = use_vllm
if use_vllm:
# 使用vLLM进行高性能推理
self.model = vllm.LLM(
model=model_path,
tokenizer=model_path,
tensor_parallel_size=2, # 张量并行
gpu_memory_utilization=0.9,
max_model_len=4096,
enable_prefix_caching=True # 前缀缓存加速
)
else:
# 标准HuggingFace加载
self.model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
)
# 加载LoRA适配器
if lora_path:
self.model = SwiftModel.from_pretrained(
self.model,
lora_path,
device_map="auto"
)
self.model.eval()
self.tokenizer = AutoTokenizer.from_pretrained(
model_path,
trust_remote_code=True
)
def stream_generate(self, prompt, max_length=512, **kwargs):
"""流式生成实现"""
input_ids = self.tokenizer.encode(prompt, return_tensors="pt")
if torch.cuda.is_available():
input_ids = input_ids.cuda()
# 使用KV缓存加速
past_key_values = None
generated = input_ids
for i in range(max_length):
with torch.no_grad():
outputs = self.model(
input_ids=generated if past_key_values is None else generated[:, -1:],
past_key_values=past_key_values,
use_cache=True
)
past_key_values = outputs.past_key_values
next_token_logits = outputs.logits[:, -1, :]
# 采样
next_token_id = self._sample_token(next_token_logits, **kwargs)
# 更新生成序列
generated = torch.cat([generated, next_token_id], dim=-1)
# 解码并yield
new_text = self.tokenizer.decode(next_token_id[0])
yield new_text
# 检查停止条件
if next_token_id.item() == self.tokenizer.eos_token_id:
break
def batch_inference(self, prompts, batch_size=4):
"""批处理推理"""
results = []
for i in range(0, len(prompts), batch_size):
batch_prompts = prompts[i:i+batch_size]
# 批量编码
encodings = self.tokenizer(
batch_prompts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
)
if torch.cuda.is_available():
encodings = {k: v.cuda() for k, v in encodings.items()}
# 批量生成
with torch.no_grad():
outputs = self.model.generate(
**encodings,
max_new_tokens=256,
do_sample=True,
temperature=0.7,
top_p=0.9
)
# 解码结果
batch_results = self.tokenizer.batch_decode(
outputs,
skip_special_tokens=True
)
results.extend(batch_results)
return results
4.3.2 推理优化技巧
# inference_optimizations.py
class InferenceOptimizer:
@staticmethod
def apply_kv_cache_optimization(model, max_batch_size=8):
"""应用KV缓存优化"""
# 动态批处理
model.config.use_cache = True
model.config.pad_token_id = model.config.eos_token_id
# 设置缓存最大长度
model.config.max_cache_len = 2048
return model
@staticmethod
def apply_quantization(model, quantization_bits=8):
"""应用动态量化"""
if quantization_bits == 8:
from torch.quantization import quantize_dynamic
model = quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8
)
elif quantization_bits == 4:
# 应用4-bit量化
from bitsandbytes.nn import Linear4bit
model = model.to(torch.float16)
return model
@staticmethod
def compile_model(model):
"""使用Torch Compile加速"""
if hasattr(torch, 'compile'):
model = torch.compile(
model,
mode="reduce-overhead",
fullgraph=True,
dynamic=True
)
return model
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)