自然语言处理——基于预训练模型的方法——第4章 自然语言处理中的神经网络基础
自然语言处理——基于预训练模型的方法——第4章 自然语言处理中的神经网络基础
《自然语言处理——基于预训练模型的方法》——车万翔、郭江、崔一鸣
自然语言处理——基于预训练模型的方法——第4章 自然语言处理中的神经网络基础
4.1 多层感知器模型
4.1.1 感知器(Perceptron)
假设有n 位面试官,每人的打分分别为 x1, x2,···, xn,则总分s=x1+x2+···+xn,如果s≥t,则给予录用,其中t被称为阈值,x1, x2,···, xn被称为输入,可以使用向量x=[x1, x2,···, xn]表示。然而,在这些面试官中,有一些经验比较丰富,而有一些则是刚入门的新手,如果简单地将它们的打分进行相加,最终的得分显然不够客观,因此可以通过对面试官的打分进行加权的方法解决,即为经验丰富的面试官赋予较高的权重,而为新手赋予较低的权重。假设n位面试官的权重分别为w1, w2,···, wn,则最终的分数为s=w1x1+w2x2+···+wnxn,同样可以使用向量w=[w1, w2,···, wn]表示n个权重,则分数可以写成权重向量和输入向量的点积,即s=w · x,于是最终的输出y为:

式中,输出y=1表示录用,y=0表示不录用。这就是感知器模型,其还可以写成以下的形式:

式中,b=−t,又被称为偏差项(Bias)。
使用时的问题:
-
特征提取
如何将一个问题的原始输入(Raw Input)转换成输入向量x
-
合理设置权重w和偏差项b(模型参数)
4.1.2 线性回归
感知器是一个分类模型,即输出结果为离散的类别(如褒义或贬义)。
线性回归(Linear Regression)是最简单的回归模型。与感知器类似,线性回归模型将输出y建模为对输入x中各个元素的线性加权和,最后也可以再加上偏差项b,即y=w1x1+w2x2+···+wnxn+b=w · x+b。
4.1.3 Logistic回归
线性回归输出值的大小(值域)是任意的,有时需要将其限制在一定的范围内。
很多函数能够实现此功能,它们又被称为激活函数(Activation Function),其中Logistic函数经常被用到,其形式为:

该函数能将y的值限制在0(z→ −∞)到L(z→+∞)之间,当z= z0时,y=L/2;k控制了函数的陡峭程度。若z=w1x1+w2x2+···+wnxn+b,此模型又被称为Logistic回归(Logistic Regression)模型。
如果将Logistic函数中的参数进行如下设置,L=1、k=1、z0=0,此时函数形式为:

该函数又被称为Sigmoid函数。→ Sigmod回归 值域恰好在0∼1之间 → 二元分类
4.1.4 Softmax回归
对第i个类别使用线性回归打一个分数,zi=wi1x1+wi2x2+···+winxn+bi。式中,wij表示第i个类别对应的第j个输入的权重。然后,对多个分数使用指数函数进行归一化计算,并获得一个输入属于某个类别的概率。该方法又称Softmax回归。

式中,z 表示向量 [z1, z2,···, zm];m 表示类别数;yi表示第 i 个类别的概率。
当m = 2,变为Sigmod函数形式。
4.1.5 多层感知器
现实世界中很多真实的问题不都是线性可分的,即无法使用一条直线、平面或者超平面分割不同的类别,其中典型的例子是异或问题(Exclusive OR,XOR),即假设输入为x1和x2,如果它们相同,即当x1=0、x2=0或x1=1、x2=1时,输出y=0;如果它们不相同,即当x1=0、x2=1或x1=1、x2=0时,输出y=1。

无法使用线性分类器恰当地将输入划分到正确的类别。
多层感知器(Multi-layer Perceptron,MLP)是解决线性不可分问题的一种解决方案。
多层感知器指的是堆叠多层线性分类器,并在中间层(也叫隐含层,Hid-denlayer)增加非线性激活函数。

ReLU(Rectified Linear Unit)是一种非线性激活函数,其定义为当某一项输入小于0时,输出为0;否则输出相应的输入值,即ReLU(z)=max(0, z)。
一般来讲,隐含层越大、层数越多,即模型的参数越多、容量越大,多层感知器的表达能力就越强,但是此时较难优化网络的参数。而如果隐含层太小、层数过少,则模型的表达能力会不足。
4.1.6 模型实现
-
神经网络层与激活函数
创建线性层(线性映射模型)

in_features 是输入特征的数目,out_features 是输出特征的数目
linear = nn.Linear(32,2) #输入32维,输出2维
inputs = torch.rand(3,32) #形状(3,32)随机张量,3为批次大小
outputs = linear(inputs) #输出张量形状为(3,2)
print(outputs)
实际调用线性层时,可以一次性输入多个样例,一般叫作一个批次(Batch),并同时获得每个样例的输出。
如果输入张量的形状是 (batch,in_features),则输出张量的形状是(batch,out_features)。
Sigmoid、Softmax等各种激活函数包含在torch.nn.functional中,实现对输入按元素进行非线性运算。
activation = torch.sigmoid(outputs)
print(activation)
-
自定义神经网络模型
在 PyTorch 中构建一个自定义神经网络模型非常简单,就是从torch.nn中的Module类派生一个子类,并实现构造函数和forward函数。
多层感知器模型:
import torch from torch import nn class MLP(nn.Module): def __init__(self,input_dim,hidden_dim,num_class): super(MLP,self).__init__() # 线性变换:输入层 → 隐藏层 self.linear1 = nn.Linear(input_dim,hidden_dim) # 使用relu激活函数 self.activate = torch.relu # 线性变换:隐藏层 → 输出层 self.linear2 = nn.Linear(hidden_dim,num_class) def forward(self,inputs): hidden = self.linear1(inputs) activation = self.activate(hidden) outputs = self.linear2(activation) probs = torch.softmax(outputs,dim=1) return probs mlp = MLP(input_dim=4,hidden_dim=5,num_class=2) inputs = torch.rand(3,4) #输入形状为(3,4)的张量,其中3表示有3个输入,4表示每个输入的维度 probs = mlp(inputs) print(probs)
4.2 卷积神经网络
4.2.1 模型结构
在多层感知器中,每层输入的各个元素都需要乘以一个独立的参数(权重),这一层又叫作全连接层(Fully Connected Layer)或稠密层(Dense Layer)。
对于某些类型的任务,这样做并不合适,例如图像识别任务、情感分类任务
→ 使用一个小的稠密层提取这些局部特征,如图像中固定大小的像素区域、文本中词的N-gram等。
为了解决关键信息位置不固定的问题,可以依次扫描输入的每个区域,该操作又被称为卷积(Con-volution)操作。
其中,每个小的、用于提取局部特征的稠密层又被称为卷积核(Kernel)或者滤波器(Filter)。
卷积操作输出的结果还可以进行进一步聚合,这一过程被称为池化(Pooling)操作。
池化:
-
最大池化
仅保留最有意义的局部特征
-
平均池化
-
加和池化
经过池化操作后,可以保证最终输出相同个数的特征。
在进行卷积操作时,可以使用多个卷积核提取不同种类的局部特征。
卷积核的构造方式:
- 使用不同组的参数,并且使用不同的初始化参数,获得不同的卷积核
- 提取不同尺度的局部特征
多个卷积核输出多个特征,再经过一个全连接的分类层就可以做出最终的决策。
最后,还可以将多个卷积层加池化层堆叠起来,形成更深层的网络,这些网络统称为卷积神经网络(Convolutional Neural Network,CNN)。

向滑动的卷积操作又叫作一维卷积,适用于自然语言等序列数据。
与多层感知器模型类似,卷积神经网络中的信息也是从输入层经过隐含层,然后传递给输出层,按照一个方向流动,因此它们都被称为前馈神经网络(FeedForward Network,FFN)。
4.2.2 模型实现
PyTorch的torch.nn包中使用Conv1d、Conv2d或Conv3d类实现卷积层,它们分别表示一维卷积、二维卷积和三维卷积。
一维卷积(Conv1d),其构造函数至少需要提供三个参数:
- in_channels为输入通道的个数,在输入层对应词向量的维度
- out_channels为输出通道的个数,对应卷积核的个数
- kernel_size为每个卷积核的宽度
当调用该Conv1d对象时,输入数据形状为(batch,in_channels,seq_len),输出数据形状为(batch,out_channels,seq_len),seq_len分别表示输入的序列长度和输出的序列长度
沿单一方向滑动的卷积操作又叫作一维卷积,适用于自然语言等序列数据
import torch
from torch.nn import Conv1d
from torch.nn import MaxPool1d #一维最大池化
conv1 = Conv1d(5,2,4) # 一维卷积,输入通道大小为5,输出通道大小为2,卷积核宽度为4
conv2 = Conv1d(5,2,3) #一维卷积,输入通道大小为5,输出通道大小为2,卷积核宽度为3
inputs = torch.rand(2,5,6) #输入数据批次为2,即有两个序列,每个序列的长度为6,维度为5
outputs1 = conv1(inputs)
outputs2 = conv2(inputs)
## 实例化池化层对象方式
# pool1 = MaxPool1d(3) #第1个池化层核的大小为3,即卷积层的输出序列长度
# pool2 = MaxPool1d(4) #第2个池化层核的大小为4
# outputs_pool1 = pool1(outputs1) # 执行一维最大池化操作,即取每行输入的最大值
# outputs_pool2 = pool1(outputs2)
##使用池化函数方式
outputs_pool1 = torch.max_pool1d(outputs1,kernel_size=outputs1.shape[2])
# outputs1的最后一维为其序列的长度
outputs_pool2 = torch.max_pool1d(outputs2,kernel_size=outputs2.shape[2])
# 拼接
# 先调用squeeze函数将最后一个为1的维度删除
outputs_pool_squeeze1 = outputs_pool1.squeeze(dim =2)
outputs_pool_squeeze2 = outputs_pool2.squeeze(dim = 2)
outputs_pool = torch.cat([outputs_pool_squeeze1,outputs_pool_squeeze2],dim=1)
#最后连接一个全连接层,完成分类功能
linear = torch.nn.Linear(4,2)
outputs_linear = linear(outputs_pool)
4.3 循环神经网络(Recurrent Neural Network,RNN)
多层感知器与卷积神经网络均为前馈神经网络,信息按照一个方向流动。
循环神经网络 → 信息循环流动
4.3.1 模型结构(原始的循环神经网络)
循环神经网络指的是网络的隐含层输出又作为其自身的输入。实际使用循环神经网络时,需要设定一个有限的循环次数,将其展开后相当于堆叠多个共享隐含层参数的前馈神经网络。
当使用循环神经网络处理一个序列输入时,需要将循环神经网络按输入时刻展开,然后将序列中的每个输入依次对应到网络不同时刻的输入上,并将当前时刻网络隐含层的输出也作为下一时刻的输入。

其中

激活函数

t是输入序列的当前时刻,其隐含层ht不但与当前的输入xt有关,而且与上一时刻的隐含层ht−1有关,这实际上是一种递归形式的定义。
每个时刻的输入经过层层递归,对最终的输出产生一定的影响,每个时刻的隐含层ht承载了1 ∼ t时刻的全部输入信息,因此循环神经网络中的隐含层也被称作记忆(Memory)单元。
在最后时刻产生输出结果,此时适用于处理文本分类问题。
每个时刻产生一个输出结果,这种结构适用于序列标注、词性标注、命名实体识别甚至分词。
4.3.2 长短时记忆网络(Long Short-Term Memory,LSTM)
在原始的循环神经网络中,信息是通过多个隐含层逐层传递到输出层的。直观上,这会导致信息的损失;更本质地,这会使得网络参数难以优化。
长短时记忆网络隐含层更新方式:

直接将hk与ht(k<t)进行了连接,跨过了中间的t−k层,从而减小了网络的层数,使得网络更容易被优化。
但是旧状态ht-1和新状态ut进行相加的更新方式过于粗糙,没有考虑两种状态对ht贡献的大小。
所以 →

σ表示Sigmoid函数,其输出恰好介于0到1之间,可作为加权求和的系数;
⊙表示Hardamard乘积,即按张量对应元素进行相乘;
ft被称作遗忘门(Forget gate),因为如果其较小时,旧状态ht−1对当前状态的贡献也较小,也就是将过去的信息都遗忘了。
但是现在的问题是旧状态和新状态的贡献互斥,总会一大一小,所以引入新的系数

新的系数it用于控制输入状态ut对当前状态的贡献,因此又被称作输入门(Input gate)。
对输出也增加门控机制,即输出门 →

ct又被称为记忆细胞(Memory cell),即存储(记忆)了截至当前时刻的重要信息。与原始的循环神经网络一样,既可以使用hn预测最终的输出结果,又可以使用ht预测每个时刻的输出结果。
无论是传统的循环神经网络还是LSTM,信息流动都是单向的,在一些应用中这并不合适,如对于词性标注任务,一个词的词性不但与其前面的单词及其自身有关,还与其后面的单词有关,但是传统的循环神经网络并不能利用某一时刻后面的信息。
-
双向循环神经网络或双向LSTM,简称Bi-RNN或Bi-LSTM。
思想是将同一个输入序列分别接入向前和向后两个循环神经网络中,然后再将两个循环神经网络的隐含层拼接在一起,共同接入输出层进行预测。
-
堆叠循环神经网络(Stacked RNN)
4.3.3 模型实现
PyTorch的torch.nn包实现,RNN类。
构造函数至少需要提供两个参数:input_size表示每个时刻输入的大小,hidden_size表示隐含层的大
小。通常将batch_first设为True(其默认值为False),即输入和输出的第1维代表批次的大小(即一次同时处理序列的数目)。
import torch
from torch.nn import RNN
# 定义一个RNN,每个时刻输入大小为4,隐含层大小为5
rnn = RNN(input_size=4,hidden_size=5,batch_first=True)
# 输入数据批次大小为2,即有两个序列,每个序列的长度为3,每个时刻输出大小为4
inputs = torch.rand(2,3,4)
# outputs 为输出序列的隐含层,hn为最后一个时刻的隐含层
outputs , hn = rnn(inputs)
print(outputs) #输出两个序列,每个序列长度为3,大小为5
print(hn) # 最后一个时刻的隐含层,值与outputs中最后一个时刻相同

初始化RNN时,还可通过设置其他参数修改网络的结构,如bidirectional=True(双向RNN,默认为False)、num_layers(堆叠的循环神经网络层数,默认为1)等。
torch.nn包中还提供了LSTM类,其初始化的参数以及输入数据与RNN相同,不同之处在于其输出数据除了最后一个时刻的隐含层hn,还输出了最后一个时刻的记忆细胞cn。
import torch
from torch.nn import LSTM
lstm = LSTM(input_size=4,hidden_size=5,batch_first=True)
inputs = torch.rand(2,3,4)
# outputs为输出序列的隐含层,hn为最后一个时刻的隐含层,cn为最后一个时刻的记忆细胞
outputs , (hn,cn) = lstm(inputs)
print(outputs)
print(hn)
print(cn)

4.3.4 基于循环神经网络的序列到序列模型
除了能够处理分类问题和序列标注问题,循环神经网络另一个强大的功能是能够处理序列到序列的理解和生成问题,相应的模型被称为序列到序列模型,也被称为编码器–解码器模型。
很多自然语言处理问题都可以看作序列到序列问题,如机器翻译,即首先对源语言的句子编码,然后生成相应的目标语言翻译。
一个基于序列到序列模型进行机器翻译的示例

首先编码器使用循环神经网络对源语言句子编码,然后以最后一个单词对应的隐含层作为初始,再调用解码器(另一个循环神经网络)逐词生成目标语言的句子。BOS:句子起始标记。
基于循环神经网络的序列到序列模型有一个基本假设,就是原始序列的最后一个隐含状态(一个向量)包含了该序列的全部信息。→ 不合理,为了解决 → 注意力模型。
4.4 注意力模型
4.4.1 注意力机制
为了解决序列到序列模型记忆长序列能力不足的问题,当要生成一个目标语言单词时,不光考虑前一个时刻的状态和已经生成的单词,还考虑当前要生成的单词和源语言句子中的哪些单词更相关,即更关注源语言的哪些词,这种做法就叫作注意力机制(Attention mechanism)。
注意力权重计算公式:

hs表示源序列中s时刻的状态;ht−1表示目标序列中前一个时刻的状态;attn是注意力计算公式,即通过两个输入状态的向量,计算一个源序列s时刻的注意力分数 ;,其中 L为源序列的长度;最后对整个源序列每个时刻的注意力分数使用Softmax函数进行归一化,获得最终的注意力权重αs。
attn的计算方式:
引入注意力机制后,基于RNN的序列到序列模型的准确率 ↑。
4.4.2 自注意力模型
当要表示序列中某一时刻的状态时,可以通过该状态与其他时刻状态之间的相关性(注意力)计算,即所谓的“观其伴、知其义”,这又被称作自注意力机制(Self-attention)。
通过自注意力机制,可以直接计算两个距离较远的时刻之间的关系。基于自注意力机制的自注意力模型已经逐步取代循环神经网络,成为自然语言处理的标准模型。
4.4.3 Transformer
要想真正取代循环神经网络,自注意力模型还需要解决如下问题:
- 在计算自注意力时,没有考虑输入的位置信息,因此无法对序列进行建模
- 输入向量xi同时承担了三种角色,即计算注意力权重时的两个向量以及被加权的向量,导致其不容易学习
- 只考虑了两个输入序列单元之间的关系,无法建模多个输入序列单元之间更复杂的关系
- 自注意力计算结果互斥,无法同时关注多个输入
融合了以下方案的自注意力模型拥有一个非常炫酷的名字——Transformer(变换器、转换器、变压器、变形金刚)
-
融入位置信息
为序列中每个输入的向量引入不同的位置信息以示区分。
引入方式:
-
位置嵌入
为序列中每个绝对位置赋予一个连续、低维、稠密的向量表示
-
位置编码
使用函数 ,直接将一个整数(位置索引值)映射到一个d维向量上。
映射公式:

解决了原始自注意力模型无法对序列进行建模的问题
-
-
输入向量角色信息
原始的自注意力模型计算注意力时直接使用两个输入向量,然后使用得到的注意力对同一个输入向量加权,这样导致一个输入向量同时承担了三种角色:查询(Query)、键(Key)和值(Value)。
→ 对不同的角色使用不同的向量,使用不同的参数矩阵对原始的输入向量做线性变换,从而让不同的变换结果承担不同的角色
新的输出向量计算公式:

-
多层自注意力
在实际应用中,往往需要同时考虑更多输入序列单元之间的关系,即更高阶的关系。
为了增强模型的表示能力,往往在每层自注意力计算之后,增加一个非线性的多层感知器(MLP)模型。为了使模型更容易学习,还可以使用层归一化(Layer Normalization)、残差连接(Residual Connections)等深度学习的训练技巧
自注意力层、非线性层以及以上的这些训练技巧,构成了一个更大的Transformer层,也叫作Transformer块(Block)。
-
自注意力计算结果互斥
自注意力结果之间是互斥的,无法同时关注多个输入。
设置多组映射矩阵即可,然后将产生的多个输出向量拼接。为了将输出结果作为下一组的输入,还需要将拼接后的输出向量再经过一个线性映射,映射回d维向量。 → 多头自注意力(Multi-head Self-attention)模型。
4.4.4 基于Transformer的序列到序列模型
与循环神经网络类似,Transformer也可以很容易地实现解码功能,两者结合,就实现了一个序列到序列的模型,可以完成机器翻译等多种自然语言处理任务。
4.4.5 Transformer模型的优缺点
- 优
- 对于长序列建模的能力更强
- 具有更高的训练速度
- 缺
- 参数量过于庞大
4.4.6 模型实现
nn.TransformerEncoder实现了编码模块,由多层Transformer块构成的,每个块使用TransformerEncoderLayer实现
import torch
from torch import nn
#### 编码模块
# 创建一个transformer块,每个输入、输出向量维度为4,头数为2
encoder_layer = nn.TransformerEncoderLayer(d_model= 4, nhead=2)
# 随机生成输入,(序列长度、批次大小、每个输入向量的维度)
src = torch.rand(2,3,4)
out = encoder_layer(src)
print(out)
## 将多个块堆叠,构成一个完整的nn.TransformerEncoder
transformer_encoder = nn.TransformerEncoder(encoder_layer,num_layers=6)
out = transformer_encoder(src)
print(out)
#### 解码模块
memory = transformer_encoder(src)
decoder_layer = nn.TransformerDecoderLayer(d_model= 4, nhead=2)
transformer_decoder = nn.TransformerDecoder(decoder_layer,num_layers=6)
out_part = torch.rand(2,3,4)
out = transformer_decoder(out_part,memory)
print(out)
4.5 神经网络模型的训练
寻找一组优化参数的过程又叫作模型训练或学习。
4.5.1 损失函数
为了评估一组参数的好坏,需要有一个准则,在机器学习中,又被称为损失函数(Loss Function)。
损失函数用于衡量在训练数据集上模型的输出与真实输出之间的差异。
损失函数的值越小,模型输出与真实输出越相似,可以认为此时模型表现越好,不过如果损失函数的值过小,那么模型就会与训练数据集过拟合(Overfit),反倒不适用于新的数据。
避免过拟合:
- 正则化
- 丢弃正则化
- 早停法
两种常用的损失函数:
- 均方误差(Mean Squared Error,MSE)

预测结果与真实结果越相似,均方误差损失越小。适用于回归问题。
-
交叉熵(Cross-Entropy,CE)损失
学习速度更快 → 分类问题

最终交叉熵损失只取决于模型对正确类别预测概率的对数值。如果模型表现越好,则预测的概率越
大,由于公式右侧前面还有一个负号,所以交叉熵损失越小(这符合直觉)。
简化:
4.5.2 梯度下降
梯度下降(Gradient Descent,GD)是一种非常基础和常用的参数优化方法。
梯度:以向量的形式写出的对多元函数各个参数求得的偏导数。物理意义是函数值增加最快的方向,或者说,沿着梯度的方向更加容易找到函数的极大值,沿着梯度相反的方向,更加容易找到函数的极小值。
梯度下降算法:
当训练数据的规模比较大时,如果每次都遍历全部的训练数据计算梯度,算法的运行时间会非常久。
小批次梯度下降(Mini-batch Gradient Descent):每次可以随机采样一定规模的训练数据估计梯度。

随机梯度下降(Stochastic Gradient Descent,SGD):当小批次的数目被设为b=1。
使用梯度下降法获得多层感知器的优化参数,解决异或问题
import torch
from torch import nn,optim
class MLP(nn.Module):
def __init__(self,input_dim,hidden_dim,num_class):
super(MLP,self).__init__()
self.linear1 = nn.Linear(input_dim,hidden_dim)
self.activate = torch.relu
self.linear2 = nn.Linear(hidden_dim,num_class)
def forward(self,inputs):
hidden = self.linear1(inputs)
activation = self.activate(hidden)
outputs = self.linear2(activation)
log_probs = torch.log_softmax(outputs,dim= 1)
return log_probs
# 异或问题的4个输入
x_train = torch.tensor([[0.0,0.0],[0.0,1.0],[1.0,0.0],[1.0,1.0]])
# 每个输入对应的输出类别
y_train = torch.tensor([0,1,1,0])
# 创建多层感知器,输入层大小为2,隐含层大小为5,输出层大小为2,即有两个类别
model = MLP(input_dim=2,hidden_dim=5,num_class=2)
# 调用负对数似然损失
criterion = nn.NLLLoss()
#使用梯度下降参数优化方法,学习率 0.05
optimizer = optim.SGD(model.parameters(),lr=0.05)
for epoch in range(500):
#调用模型,预测输出结果
y_pred = model(x_train)
# 计算损失
loss = criterion(y_pred,y_train)
## 将优化器的梯度值置0,否则每次循环的梯度会累加
optimizer.zero_grad()
## 通过反向传播计算参数梯度
loss.backward()
# 在优化器中更新参数
optimizer.step()
print("Parameters:")
for name,param in model.named_parameters():
print(name,param.data)
y_pred = model(x_train)
print("Predicted results:",y_pred.argmax(axis=1))

其他优化器:
- Adam
- Adagrad
- Adadelta
4.6 情感分类实战
4.6.1 词表映射
将输入的语言符号,通常为标记(Token),映射为大于等于0、小于词表大小的整数,该整数也被称作一个标记的索引值或下标。
实现标记和索引之间的相互映射:
from collections import defaultdict, Counter
class Vocab:
def __init__(self, tokens=None):
self.idx_to_token = list()
self.token_to_idx = dict()
if tokens is not None:
if "<unk>" not in tokens:
tokens = tokens + ["<unk>"]
for token in tokens:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
self.unk = self.token_to_idx['<unk>']
@classmethod
def build(cls, text, min_freq=1, reserved_tokens=None):
token_freqs = defaultdict(int)
for sentence in text:
for token in sentence:
token_freqs[token] += 1
uniq_tokens = ["<unk>"] + (reserved_tokens if reserved_tokens else [])
uniq_tokens += [token for token, freq in token_freqs.items() \
if freq >= min_freq and token != "<unk>"]
return cls(uniq_tokens)
def __len__(self):
return len(self.idx_to_token)
def __getitem__(self, token):
return self.token_to_idx.get(token, self.unk)
def convert_tokens_to_ids(self, tokens):
return [self[token] for token in tokens]
def convert_ids_to_tokens(self, indices):
return [self.idx_to_token[index] for index in indices]
def save_vocab(vocab, path):
with open(path, 'w') as writer:
writer.write("\n".join(vocab.idx_to_token))
def read_vocab(path):
with open(path, 'r') as f:
tokens = f.read().split('\n')
return Vocab(tokens)
4.6.2 词向量层
将一个词(或者标记)转换为一个低维、稠密、连续的词向量(也称Embed-ding)是一种基本的词表示方法,通过torch.nn包提供的Embedding层即可实现该功能。
创建Embedding对象时,需要提供两个参数,分别是num_embeddings,即词表的大小;以及embedding_dim,即Embedding向量的维度。
功能:将输入的整数张量中每个整数(通过词表映射功能获得标记对应的整数)映射为相应维度(embedding_dim)的张量。
4.6.3 融入词向量层以及词袋模型的多层感知器
在经过多层感知器之前,需要利用词向量层将输入的整数映射为向量。
import torch
from torch import nn
from torch.nn import functional as F
class MLP(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
super(MLP, self).__init__()
# 词嵌入层
self.embedding = nn.Embedding(vocab_size, embedding_dim)
# 线性变换:词嵌入层->隐含层
self.linear1 = nn.Linear(embedding_dim, hidden_dim)
# 使用ReLU激活函数
self.activate = F.relu
# 线性变换:激活层->输出层
self.linear2 = nn.Linear(hidden_dim, num_class)
def forward(self, inputs):
embeddings = self.embedding(inputs)
# 将序列中多个embedding进行聚合(此处是求平均值)
embedding = embeddings.mean(dim=1)
hidden = self.activate(self.linear1(embedding))
outputs = self.linear2(hidden)
# 获得每个序列属于某一类别概率的对数值
probs = F.log_softmax(outputs, dim=1)
return probs
mlp = MLP(vocab_size=8, embedding_dim=3, hidden_dim=5, num_class=2)
# 输入为两个长度为4的整数序列
inputs = torch.tensor([[0, 1, 2, 1], [4, 6, 6, 7]], dtype=torch.long)
outputs = mlp(inputs)
print(outputs)
4.6.4 数据处理
数据处理的第一步自然是将待处理的数据从硬盘或者其他地方加载到程序中,此时读入的是原始文本数据,还需要经过分句、标记解析等预处理过程转换为标记序列,然后再使用词表映射工具将每个标记映射到相应的索引值。
import torch
from vocab import Vocab
def load_sentence_polarity():
from nltk.corpus import sentence_polarity
vocab = Vocab.build(sentence_polarity.sents())
train_data = [(vocab.convert_tokens_to_ids(sentence), 0)
for sentence in sentence_polarity.sents(categories='pos')[:4000]] \
+ [(vocab.convert_tokens_to_ids(sentence), 1)
for sentence in sentence_polarity.sents(categories='neg')[:4000]]
test_data = [(vocab.convert_tokens_to_ids(sentence), 0)
for sentence in sentence_polarity.sents(categories='pos')[4000:]] \
+ [(vocab.convert_tokens_to_ids(sentence), 1)
for sentence in sentence_polarity.sents(categories='neg')[4000:]]
return train_data, test_data, vocab
def length_to_mask(lengths):
max_len = torch.max(lengths)
mask = torch.arange(max_len).expand(lengths.shape[0], max_len) < lengths.unsqueeze(1)
return mask
def load_treebank():
from nltk.corpus import treebank
sents, postags = zip(*(zip(*sent) for sent in treebank.tagged_sents()))
vocab = Vocab.build(sents, reserved_tokens=["<pad>"])
tag_vocab = Vocab.build(postags)
train_data = [(vocab.convert_tokens_to_ids(sentence), tag_vocab.convert_tokens_to_ids(tags)) for sentence, tags in zip(sents[:3000], postags[:3000])]
test_data = [(vocab.convert_tokens_to_ids(sentence), tag_vocab.convert_tokens_to_ids(tags)) for sentence, tags in zip(sents[3000:], postags[3000:])]
return train_data, test_data, vocab, tag_vocab
多层感知器模型的训练与测试
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
from vocab import Vocab
from utils import load_sentence_polarity
class BowDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, i):
return self.data[i]
def collate_fn(examples):
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
offsets = [0] + [i.shape[0] for i in inputs]
offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
inputs = torch.cat(inputs)
return inputs, offsets, targets
class MLP(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
super(MLP, self).__init__()
self.embedding = nn.EmbeddingBag(vocab_size, embedding_dim)
self.linear1 = nn.Linear(embedding_dim, hidden_dim)
self.activate = F.relu
self.linear2 = nn.Linear(hidden_dim, num_class)
def forward(self, inputs, offsets):
embedding = self.embedding(inputs, offsets)
hidden = self.activate(self.linear1(embedding))
outputs = self.linear2(hidden)
log_probs = F.log_softmax(outputs, dim=1)
return log_probs
# tqdm是一个Python模块,能以进度条的方式显示迭代的进度
from tqdm.auto import tqdm
# 超参数设置
embedding_dim = 128
hidden_dim = 256
num_class = 2
batch_size = 32
num_epoch = 5
# 加载数据
train_data, test_data, vocab = load_sentence_polarity()
train_dataset = BowDataset(train_data)
test_dataset = BowDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)
# 加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MLP(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device) # 将模型加载到CPU或GPU设备
#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) # 使用Adam优化器
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
inputs, offsets, targets = [x.to(device) for x in batch]
log_probs = model(inputs, offsets)
loss = nll_loss(log_probs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")
# 测试过程
acc = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, offsets, targets = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs, offsets)
acc += (output.argmax(dim=1) == targets).sum().item()
# 输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader):.2f}")

4.6.6 基于卷积神经网络的情感分类
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import defaultdict
from vocab import Vocab
from utils import load_sentence_polarity
class CnnDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, i):
return self.data[i]
def collate_fn(examples):
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
# 对batch内的样本进行padding,使其具有相同长度
inputs = pad_sequence(inputs, batch_first=True)
return inputs, targets
class CNN(nn.Module):
def __init__(self, vocab_size, embedding_dim, filter_size, num_filter, num_class):
super(CNN, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.conv1d = nn.Conv1d(embedding_dim, num_filter, filter_size, padding=1)
self.activate = F.relu
self.linear = nn.Linear(num_filter, num_class)
def forward(self, inputs):
embedding = self.embedding(inputs)
convolution = self.activate(self.conv1d(embedding.permute(0, 2, 1)))
pooling = F.max_pool1d(convolution, kernel_size=convolution.shape[2])
outputs = self.linear(pooling.squeeze(dim=2))
log_probs = F.log_softmax(outputs, dim=1)
return log_probs
#tqdm是一个Pyth模块,能以进度条的方式显示迭代的进度
from tqdm.auto import tqdm
#超参数设置
embedding_dim = 128
hidden_dim = 256
num_class = 2
batch_size = 32
num_epoch = 5
filter_size = 3
num_filter = 100
#加载数据
train_data, test_data, vocab = load_sentence_polarity()
train_dataset = CnnDataset(train_data)
test_dataset = CnnDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)
#加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CNN(len(vocab), embedding_dim, filter_size, num_filter, num_class)
model.to(device) #将模型加载到CPU或GPU设备
#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) #使用Adam优化器
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
inputs, targets = [x.to(device) for x in batch]
log_probs = model(inputs)
loss = nll_loss(log_probs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")
#测试过程
acc = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, targets = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs)
acc += (output.argmax(dim=1) == targets).sum().item()
#输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader):.2f}")

4.6.7 基于循环神经网络的情感分类
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
from collections import defaultdict
from vocab import Vocab
from utils import load_sentence_polarity
#tqdm是一个Python模块,能以进度条的方式显式迭代的进度
from tqdm.auto import tqdm
class LstmDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, i):
return self.data[i]
def collate_fn(examples):
lengths = torch.tensor([len(ex[0]) for ex in examples])
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
# 对batch内的样本进行padding,使其具有相同长度
inputs = pad_sequence(inputs, batch_first=True)
return inputs, lengths, targets
class LSTM(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
super(LSTM, self).__init__()
self.embeddings = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
self.output = nn.Linear(hidden_dim, num_class)
def forward(self, inputs, lengths):
embeddings = self.embeddings(inputs)
x_pack = pack_padded_sequence(embeddings, lengths, batch_first=True, enforce_sorted=False)
hidden, (hn, cn) = self.lstm(x_pack)
outputs = self.output(hn[-1])
log_probs = F.log_softmax(outputs, dim=-1)
return log_probs
embedding_dim = 128
hidden_dim = 256
num_class = 2
batch_size = 32
num_epoch = 5
#加载数据
train_data, test_data, vocab = load_sentence_polarity()
train_dataset = LstmDataset(train_data)
test_dataset = LstmDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)
#加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTM(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device) #将模型加载到GPU中(如果已经正确安装)
#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) #使用Adam优化器
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
inputs, lengths, targets = [x.to(device) for x in batch]
log_probs = model(inputs, lengths)
loss = nll_loss(log_probs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")
#测试过程
acc = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, lengths, targets = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs, lengths)
acc += (output.argmax(dim=1) == targets).sum().item()
#输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader):.2f}")
4.6.8 基于Transformer的情感分类
import math
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
from collections import defaultdict
from vocab import Vocab
from utils import load_sentence_polarity, length_to_mask
# tqdm是一个Pyth模块,能以进度条的方式显式迭代的进度
from tqdm.auto import tqdm
class TransformerDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, i):
return self.data[i]
def collate_fn(examples):
lengths = torch.tensor([len(ex[0]) for ex in examples])
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
# 对batch内的样本进行padding,使其具有相同长度
inputs = pad_sequence(inputs, batch_first=True)
return inputs, lengths, targets
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=512):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return x
class Transformer(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class,
dim_feedforward=512, num_head=2, num_layers=2, dropout=0.1, max_len=128, activation: str = "relu"):
super(Transformer, self).__init__()
# 词嵌入层
self.embedding_dim = embedding_dim
self.embeddings = nn.Embedding(vocab_size, embedding_dim)
self.position_embedding = PositionalEncoding(embedding_dim, dropout, max_len)
# 编码层:使用Transformer
encoder_layer = nn.TransformerEncoderLayer(hidden_dim, num_head, dim_feedforward, dropout, activation)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
# 输出层
self.output = nn.Linear(hidden_dim, num_class)
def forward(self, inputs, lengths):
inputs = torch.transpose(inputs, 0, 1)
hidden_states = self.embeddings(inputs)
hidden_states = self.position_embedding(hidden_states)
attention_mask = length_to_mask(lengths) == False
hidden_states = self.transformer(hidden_states, src_key_padding_mask=attention_mask)
hidden_states = hidden_states[0, :, :]
output = self.output(hidden_states)
log_probs = F.log_softmax(output, dim=1)
return log_probs
embedding_dim = 128
hidden_dim = 128
num_class = 2
batch_size = 32
num_epoch = 5
# 加载数据
train_data, test_data, vocab = load_sentence_polarity()
train_dataset = TransformerDataset(train_data)
test_dataset = TransformerDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)
# 加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Transformer(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device) # 将模型加载到GPU中(如果已经正确安装)
# 训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) # 使用Adam优化器
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
inputs, lengths, targets = [x.to(device) for x in batch]
log_probs = model(inputs, lengths)
loss = nll_loss(log_probs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")
# 测试过程
acc = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, lengths, targets = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs, lengths)
acc += (output.argmax(dim=1) == targets).sum().item()
# 输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader):.2f}")
4.7 词性标注实战
4.7.1 基于前馈神经网络的词性标注
将词性标注任务看作多类别文本分类问题,即取目标词的上下文词作为输入,目标词的词性作为输出类别。
4.7.2 基于循环神经网络的词性标注
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from collections import defaultdict
from vocab import Vocab
from utils import load_treebank
#tqdm是一个Python模块,能以进度条的方式显式迭代的进度
from tqdm.auto import tqdm
WEIGHT_INIT_RANGE = 0.1
class LstmDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, i):
return self.data[i]
def collate_fn(examples):
lengths = torch.tensor([len(ex[0]) for ex in examples])
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = [torch.tensor(ex[1]) for ex in examples]
inputs = pad_sequence(inputs, batch_first=True, padding_value=vocab["<pad>"])
targets = pad_sequence(targets, batch_first=True, padding_value=vocab["<pad>"])
return inputs, lengths, targets, inputs != vocab["<pad>"]
def init_weights(model):
for param in model.parameters():
torch.nn.init.uniform_(param, a=-WEIGHT_INIT_RANGE, b=WEIGHT_INIT_RANGE)
class LSTM(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
super(LSTM, self).__init__()
self.embeddings = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
self.output = nn.Linear(hidden_dim, num_class)
init_weights(self)
def forward(self, inputs, lengths):
embeddings = self.embeddings(inputs)
x_pack = pack_padded_sequence(embeddings, lengths, batch_first=True, enforce_sorted=False)
hidden, (hn, cn) = self.lstm(x_pack)
hidden, _ = pad_packed_sequence(hidden, batch_first=True)
outputs = self.output(hidden)
log_probs = F.log_softmax(outputs, dim=-1)
return log_probs
embedding_dim = 128
hidden_dim = 256
batch_size = 32
num_epoch = 5
#加载数据
train_data, test_data, vocab, pos_vocab = load_treebank()
train_dataset = LstmDataset(train_data)
test_dataset = LstmDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)
num_class = len(pos_vocab)
#加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTM(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device) #将模型加载到GPU中(如果已经正确安装)
#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) #使用Adam优化器
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
inputs, lengths, targets, mask = [x.to(device) for x in batch]
log_probs = model(inputs, lengths)
loss = nll_loss(log_probs[mask], targets[mask])
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")
#测试过程
acc = 0
total = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, lengths, targets, mask = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs, lengths)
acc += (output.argmax(dim=-1) == targets)[mask].sum().item()
total += mask.sum().item()
#输出在测试集上的准确率
print(f"Acc: {acc / total:.2f}")
4.7.3 基于Transformer 的词性标注
# Defined in Section 4.7.3
import math
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
from collections import defaultdict
from vocab import Vocab
from utils import load_treebank, length_to_mask
#tqdm是一个Pyth模块,能以进度条的方式显式迭代的进度
from tqdm.auto import tqdm
class TransformerDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, i):
return self.data[i]
def collate_fn(examples):
lengths = torch.tensor([len(ex[0]) for ex in examples])
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = [torch.tensor(ex[1]) for ex in examples]
# 对batch内的样本进行padding,使其具有相同长度
inputs = pad_sequence(inputs, batch_first=True, padding_value=vocab["<pad>"])
targets = pad_sequence(targets, batch_first=True, padding_value=vocab["<pad>"])
return inputs, lengths, targets, inputs != vocab["<pad>"]
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=512):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return x
class Transformer(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class,
dim_feedforward=512, num_head=2, num_layers=2, dropout=0.1, max_len=512, activation: str = "relu"):
super(Transformer, self).__init__()
# 词嵌入层
self.embedding_dim = embedding_dim
self.embeddings = nn.Embedding(vocab_size, embedding_dim)
self.position_embedding = PositionalEncoding(embedding_dim, dropout, max_len)
# 编码层:使用Transformer
encoder_layer = nn.TransformerEncoderLayer(hidden_dim, num_head, dim_feedforward, dropout, activation)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
# 输出层
self.output = nn.Linear(hidden_dim, num_class)
def forward(self, inputs, lengths):
inputs = torch.transpose(inputs, 0, 1)
hidden_states = self.embeddings(inputs)
hidden_states = self.position_embedding(hidden_states)
attention_mask = length_to_mask(lengths) == False
hidden_states = self.transformer(hidden_states, src_key_padding_mask=attention_mask).transpose(0, 1)
logits = self.output(hidden_states)
log_probs = F.log_softmax(logits, dim=-1)
return log_probs
embedding_dim = 128
hidden_dim = 128
batch_size = 32
num_epoch = 5
#加载数据
train_data, test_data, vocab, pos_vocab = load_treebank()
train_dataset = TransformerDataset(train_data)
test_dataset = TransformerDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)
num_class = len(pos_vocab)
#加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Transformer(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device) #将模型加载到GPU中(如果已经正确安装)
#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) #使用Adam优化器
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
inputs, lengths, targets, mask = [x.to(device) for x in batch]
log_probs = model(inputs, lengths)
loss = nll_loss(log_probs[mask], targets[mask])
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")
#测试过程
acc = 0
total = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, lengths, targets, mask = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs, lengths)
acc += (output.argmax(dim=-1) == targets)[mask].sum().item()
total += mask.sum().item()
#输出在测试集上的准确率
print(f"Acc: {acc / total:.2f}")
aset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)
num_class = len(pos_vocab)
#加载模型
device = torch.device(‘cuda’ if torch.cuda.is_available() else ‘cpu’)
model = Transformer(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device) #将模型加载到GPU中(如果已经正确安装)
#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) #使用Adam优化器
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}“):
inputs, lengths, targets, mask = [x.to(device) for x in batch]
log_probs = model(inputs, lengths)
loss = nll_loss(log_probs[mask], targets[mask])
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}”)
#测试过程
acc = 0
total = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, lengths, targets, mask = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs, lengths)
acc += (output.argmax(dim=-1) == targets)[mask].sum().item()
total += mask.sum().item()
#输出在测试集上的准确率
print(f"Acc: {acc / total:.2f}")
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)