基于pytorch 自建AI大模型
一、千万参数大模型核心源码(PyTorch实现) 基于Transformer解码器架构,实现1200万参数大模型(可通过调整维度/层数扩展至千万级),支持文本自回归生成,代码精简且易理解: python。
·
一、千万参数大模型核心源码(PyTorch实现) 基于Transformer解码器架构,实现1200万参数大模型(可通过调整维度/层数扩展至千万级),支持文本自回归生成,代码精简且易理解: python
import torchimport torch.nn as nnimport torch.nn.functional as Ffrom dataclasses import dataclass
1. 配置类:统一管理模型超参数@dataclassclass ModelConfig: vocab_size: int = 10000 # 词汇表大小(简化版,实际可用BPE分词扩展) d_model: int = 512 # 模型隐藏层维度 n_heads: int = 8 # 多头注意力头数 n_layers: int = 6 # Transformer解码器层数 d_ff: int = 2048 # 前馈网络中间层维度 dropout: float = 0.1 # Dropout概率 max_seq_len: int = 128 # 最大序列长度
2. 多头注意力层class MultiHeadAttention(nn.Module): def init(self, config): super().init() assert config.d_model % config.n_heads == 0, “d_model必须能被n_heads整除” self.d_k = config.d_model // config.n_heads # 每个头的维度 self.n_heads = config.n_heads # 线性投影层(Q, K, V共享投影,输出维度=3*d_model) self.w_qkv = nn.Linear(config.d_model, config.d_model * 3) # 输出投影层 self.w_o = nn.Linear(config.d_model, config.d_model) self.dropout = nn.Dropout(config.dropout)
def forward(self, x): batch_size = x.size(0) seq_len = x.size(1) # 1. 生成Q, K, V:[batch, seq_len, 3*d_model] → 拆分3个[batch, seq_len, d_model] qkv = self.w_qkv(x).chunk(3, dim=-1) # 2. 多头拆分:[batch, seq_len, d_model] → [batch, n_heads, seq_len, d_k] q = qkv[0].view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2) k = qkv[1].view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2) v = qkv[2].view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2) # 3. 自注意力计算(带掩码,防止看未来token) scores = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32)) mask = torch.triu(torch.ones(seq_len, seq_len, device=x.device), diagonal=1).bool() # 上三角掩码 scores.masked_fill_(mask, -1e9) attn_weights = F.softmax(scores, dim=-1) attn_weights = self.dropout(attn_weights) # 4. 多头合并:[batch, n_heads, seq_len, d_k] → [batch, seq_len, d_model] attn_output = torch.matmul(attn_weights, v).transpose(1, 2).contiguous() attn_output = attn_output.view(batch_size, seq_len, -1) output = self.w_o(attn_output) return output
3. 前馈网络层class FeedForwardNetwork(nn.Module): def init(self, config): super().init() self.fc1 = nn.Linear(config.d_model, config.d_ff) self.fc2 = nn.Linear(config.d_ff, config.d_model) self.dropout = nn.Dropout(config.dropout) self.gelu = nn.GELU() # 激活函数(优于ReLU,大模型常用)
def forward(self, x): return self.fc2(self.dropout(self.gelu(self.fc1(x))))
4. Transformer解码器块class DecoderBlock(nn.Module): def init(self, config): super().init() self.self_attn = MultiHeadAttention(config) self.ffn = FeedForwardNetwork(config) # 层归一化(Pre-LN架构,训练更稳定) self.norm1 = nn.LayerNorm(config.d_model) self.norm2 = nn.LayerNorm(config.d_model) self.dropout = nn.Dropout(config.dropout)
def forward(self, x): # 残差连接 + 归一化 + 注意力 x = x + self.dropout(self.self_attn(self.norm1(x))) # 残差连接 + 归一化 + 前馈网络 x = x + self.dropout(self.ffn(self.norm2(x))) return x
5. 完整大模型(Transformer解码器)class MillionScaleModel(nn.Module): def init(self, config): super().init() self.config = config # Token嵌入层(将token ID转为向量) self.embedding = nn.Embedding(config.vocab_size, config.d_model) # 位置编码(注入序列位置信息) self.pos_encoding = nn.Parameter(torch.randn(1, config.max_seq_len, config.d_model)) # 解码器层堆叠 self.decoder_layers = nn.ModuleList([DecoderBlock(config) for _ in range(config.n_layers)]) # 输出层(映射到词汇表概率) self.norm_final = nn.LayerNorm(config.d_model) self.fc_out = nn.Linear(config.d_model, config.vocab_size) # 初始化权重(提升训练稳定性) self.apply(self._init_weights)
def _init_weights(self, m): if isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, nn.Embedding): nn.init.normal_(m.weight, std=0.02)
def forward(self, input_ids): batch_size = input_ids.size(0) seq_len = input_ids.size(1) # 1. 嵌入 + 位置编码 x = self.embedding(input_ids) # [batch, seq_len, d_model] x = x + self.pos_encoding[:, :seq_len, :] # 广播位置编码 x = self.dropout(x) # 输入dropout # 2. 经过所有解码器块 for layer in self.decoder_layers: x = layer(x) # 3. 输出层计算 x = self.norm_final(x) logits = self.fc_out(x) # [batch, seq_len, vocab_size] return logits
6. 模型使用示例(训练+生成)if name == “main”: # 初始化配置和模型 config = ModelConfig() model = MillionScaleModel(config) # 计算模型参数总量(约1200万) total_params = sum(p.numel() for p in model.parameters()) print(f"模型参数总量:{total_params / 1e6:.2f}M") # 模拟训练数据(batch_size=2, seq_len=32) input_ids = torch.randint(0, config.vocab_size, (2, 32)) labels = torch.randint(0, config.vocab_size, (2, 32)) # 训练流程(简化版) optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) model.train() for epoch in range(3): optimizer.zero_grad() logits = model(input_ids) # 计算交叉熵损失(忽略padding token,此处简化为全序列计算) loss = F.cross_entropy(logits.reshape(-1, config.vocab_size), labels.reshape(-1)) loss.backward() optimizer.step() print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}") # 文本生成示例(自回归生成) model.eval() start_token = torch.tensor([[1]]) # 起始token(假设1是) generated = start_token with torch.no_grad(): for _ in range(20): # 生成20个token logits = model(generated) next_token =
-____________________
完整代码和案例请关注微信公众号:颇锐克科技共享
-______________________
更多AI,GPU,Linux,Android,芯片行业技术分享请关注公众号:颇锐克科技共享。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)