基于LSTM的中文情感分析实战指南
LSTM中文情感分析指南 摘要 本文详细介绍了基于LSTM网络的中文情感分析方法。首先阐述了RNN和LSTM的理论基础,重点分析了LSTM的门控机制如何解决梯度消失问题。接着详细讲解了中文文本预处理的关键步骤,包括使用jieba进行分词、停用词处理和文本向量化。文章提供了完整的模型构建流程,从环境配置、数据准备到LSTM实现,并介绍了模型训练、评估和优化技巧。最后给出了完整代码示例,为中文情感分析
基于LSTM的中文情感分析实战指南

目录
引言
情感分析(Sentiment Analysis)是自然语言处理中的一项重要任务,旨在从文本中识别和提取主观信息,判断文本所表达的情感倾向(正面、负面或中性)。在电商评论分析、舆情监控、社交媒体分析等领域有着广泛的应用。
中文情感分析相比英文有其独特的挑战:
- 无空格分隔:中文词语之间没有天然的分隔符,需要进行分词处理
- 一词多义:同一个词在不同语境下可能有不同的情感色彩
- 表达含蓄:中文表达往往比较委婉,情感判断需要更多上下文
本文将详细介绍如何使用LSTM(Long Short-Term Memory)网络构建一个中文情感分析模型。
理论基础
循环神经网络(RNN)
循环神经网络是一类专门处理序列数据的神经网络。与传统前馈神经网络不同,RNN在隐藏层之间引入了循环连接,使得网络能够保持对先前信息的"记忆"。
RNN的基本结构:
┌─────────────────────────────────────┐
│ │
▼ │
┌───────┐ ┌───────┐ ┌───────┐ │
│ h_t │────▶│ h_t+1 │────▶│ h_t+2 │────┘
└───────┘ └───────┘ └───────┘
▲ ▲ ▲
│ │ │
┌───────┐ ┌───────┐ ┌───────┐
│ x_t │ │ x_t+1 │ │ x_t+2 │
└───────┘ └───────┘ └───────┘
RNN的数学表达:
h t = tanh ( W x h ⋅ x t + W h h ⋅ h t − 1 + b h ) h_t = \tanh(W_{xh} \cdot x_t + W_{hh} \cdot h_{t-1} + b_h) ht=tanh(Wxh⋅xt+Whh⋅ht−1+bh)
y t = W h y ⋅ h t + b y y_t = W_{hy} \cdot h_t + b_y yt=Why⋅ht+by
其中:
- x t x_t xt 是时刻 t t t 的输入
- h t h_t ht 是时刻 t t t 的隐藏状态
- y t y_t yt 是时刻 t t t 的输出
- W W W 和 b b b 分别是权重矩阵和偏置向量
RNN的局限性——梯度消失问题:
在处理长序列时,RNN会遇到梯度消失(或梯度爆炸)问题。当序列很长时,早期的信息在反向传播过程中会逐渐衰减,导致模型难以学习长距离依赖关系。
LSTM网络详解
LSTM(Long Short-Term Memory)是一种特殊的RNN架构,通过引入门控机制(Gating Mechanism)来解决梯度消失问题,使网络能够有效地学习长期依赖关系。
LSTM的核心组件:
LSTM单元包含三个门和一个细胞状态:
┌─────────────────────────────────────────┐
│ LSTM Cell │
│ │
c_{t-1} ───────▶│──[×]────────[+]────────────────────────▶ c_t
│ ▲ ▲ │
│ │ ┌─────┴─────┐ │
│ │ │ │ │
│ │ [tanh] [×]◀── i_t (输入门) │
│ │ ▲ ▲ │
│ │ │ │ │
│ f_t └────┬─────┘ │
│ (遗忘门) │ │
│ │ │
h_{t-1} ───────▶│─────────────┼──────────[×]─────────────▶ h_t
│ │ ▲ │
x_t ───────────▶│─────────────┴───────[tanh]◀─[×]◀── o_t │
│ (输出门) │
└─────────────────────────────────────────┘
三个门的作用:
-
遗忘门(Forget Gate):决定从细胞状态中丢弃哪些信息
f t = σ ( W f ⋅ [ h t − 1 , x t ] + b f ) f_t = \sigma(W_f \cdot [h_{t-1}, x_t] + b_f) ft=σ(Wf⋅[ht−1,xt]+bf) -
输入门(Input Gate):决定哪些新信息将被存储到细胞状态中
i t = σ ( W i ⋅ [ h t − 1 , x t ] + b i ) i_t = \sigma(W_i \cdot [h_{t-1}, x_t] + b_i) it=σ(Wi⋅[ht−1,xt]+bi)
C ~ t = tanh ( W C ⋅ [ h t − 1 , x t ] + b C ) \tilde{C}_t = \tanh(W_C \cdot [h_{t-1}, x_t] + b_C) C~t=tanh(WC⋅[ht−1,xt]+bC) -
输出门(Output Gate):决定输出什么内容
o t = σ ( W o ⋅ [ h t − 1 , x t ] + b o ) o_t = \sigma(W_o \cdot [h_{t-1}, x_t] + b_o) ot=σ(Wo⋅[ht−1,xt]+bo)
细胞状态更新:
C t = f t ⊙ C t − 1 + i t ⊙ C ~ t C_t = f_t \odot C_{t-1} + i_t \odot \tilde{C}_t Ct=ft⊙Ct−1+it⊙C~t
隐藏状态更新:
h t = o t ⊙ tanh ( C t ) h_t = o_t \odot \tanh(C_t) ht=ot⊙tanh(Ct)
其中 σ \sigma σ 表示Sigmoid函数, ⊙ \odot ⊙ 表示逐元素相乘。
LSTM为何能解决梯度消失问题?
- 细胞状态 C t C_t Ct 通过线性连接在时间步之间传递,梯度可以几乎无损地流动
- 遗忘门可以学习保持长期记忆
- 门控机制使网络能够选择性地记忆或遗忘信息
情感分析任务概述
情感分析可以分为以下几个层次:
| 层次 | 描述 | 示例 |
|---|---|---|
| 文档级 | 对整个文档的情感进行分类 | 判断一篇影评是正面还是负面 |
| 句子级 | 对单个句子的情感进行分类 | 判断"这部电影太棒了"的情感 |
| 方面级 | 识别特定方面的情感 | "手机拍照不错但电池不行"中对拍照和电池的不同情感 |
本文将聚焦于句子级情感分析,即二分类任务(正面/负面)。
中文文本预处理
中文文本预处理是构建高质量情感分析模型的关键步骤。
中文分词
由于中文没有天然的词语分隔符,需要使用分词工具将句子切分成词语序列。常用的中文分词工具有:
| 工具 | 特点 | 适用场景 |
|---|---|---|
| jieba | 轻量级,易用 | 通用场景 |
| pkuseg | 准确率高 | 学术研究 |
| HanLP | 功能全面 | 企业级应用 |
| LTP | 哈工大出品 | 深度NLP任务 |
jieba分词示例:
import jieba
text = "这部电影的剧情非常精彩,演员演技也很出色!"
# 精确模式(默认)
words = jieba.lcut(text)
print("精确模式:", words)
# 输出: ['这部', '电影', '的', '剧情', '非常', '精彩', ',', '演员', '演技', '也', '很', '出色', '!']
# 全模式
words_full = jieba.lcut(text, cut_all=True)
print("全模式:", words_full)
# 搜索引擎模式
words_search = jieba.lcut_for_search(text)
print("搜索引擎模式:", words_search)
添加自定义词典:
# 添加自定义词语
jieba.add_word("演技炸裂")
jieba.add_word("神仙打架")
# 加载自定义词典文件
jieba.load_userdict("custom_dict.txt")
停用词处理
停用词是指在文本中频繁出现但对情感分析贡献不大的词语,如"的"、“是”、"了"等。
加载停用词表:
def load_stopwords(filepath):
"""加载停用词表"""
with open(filepath, 'r', encoding='utf-8') as f:
stopwords = set([line.strip() for line in f])
return stopwords
def remove_stopwords(words, stopwords):
"""去除停用词"""
return [word for word in words if word not in stopwords]
# 使用示例
stopwords = load_stopwords('stopwords.txt')
text = "这部电影的剧情非常精彩"
words = jieba.lcut(text)
filtered_words = remove_stopwords(words, stopwords)
print("去除停用词后:", filtered_words)
# 输出: ['电影', '剧情', '非常', '精彩']
常见中文停用词表来源:
- 百度停用词表
- 哈工大停用词表
- 四川大学机器智能实验室停用词表
文本向量化
将文本转换为数值向量是深度学习模型处理文本的前提。
1. 词汇表构建
from collections import Counter
def build_vocab(texts, min_freq=1, max_vocab_size=50000):
"""
构建词汇表
Args:
texts: 分词后的文本列表,每个元素是词语列表
min_freq: 最小词频阈值
max_vocab_size: 词汇表最大大小
Returns:
word2idx: 词语到索引的映射
idx2word: 索引到词语的映射
"""
# 统计词频
word_counts = Counter()
for text in texts:
word_counts.update(text)
# 过滤低频词并排序
words = [word for word, count in word_counts.most_common(max_vocab_size)
if count >= min_freq]
# 添加特殊标记
special_tokens = ['<PAD>', '<UNK>']
words = special_tokens + words
# 构建映射
word2idx = {word: idx for idx, word in enumerate(words)}
idx2word = {idx: word for word, idx in word2idx.items()}
return word2idx, idx2word
2. 文本编码
def encode_texts(texts, word2idx, max_len=128):
"""
将文本转换为索引序列
Args:
texts: 分词后的文本列表
word2idx: 词语到索引的映射
max_len: 最大序列长度
Returns:
encoded: 编码后的文本数组
"""
import numpy as np
encoded = []
for text in texts:
# 将词语转换为索引
indices = [word2idx.get(word, word2idx['<UNK>']) for word in text]
# 截断或填充
if len(indices) > max_len:
indices = indices[:max_len]
else:
indices = indices + [word2idx['<PAD>']] * (max_len - len(indices))
encoded.append(indices)
return np.array(encoded)
3. 词嵌入(Word Embedding)
词嵌入是将离散的词语映射到连续的向量空间,使得语义相似的词语在向量空间中距离更近。
import torch
import torch.nn as nn
class EmbeddingLayer(nn.Module):
def __init__(self, vocab_size, embedding_dim, pretrained_embeddings=None):
super(EmbeddingLayer, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
# 加载预训练词向量
if pretrained_embeddings is not None:
self.embedding.weight.data.copy_(torch.from_numpy(pretrained_embeddings))
# 可选:冻结词向量
# self.embedding.weight.requires_grad = False
def forward(self, x):
return self.embedding(x)
常用的中文预训练词向量:
- Tencent AI Lab Embedding
- 北京师范大学中文词向量
- fastText中文预训练词向量
模型构建
环境配置
所需依赖:
# 创建虚拟环境
conda create -n sentiment python=3.9
conda activate sentiment
# 安装依赖
pip install torch torchvision torchaudio
pip install jieba
pip install numpy pandas scikit-learn
pip install matplotlib seaborn
pip install tqdm
requirements.txt:
torch>=1.9.0
jieba>=0.42.1
numpy>=1.21.0
pandas>=1.3.0
scikit-learn>=0.24.0
matplotlib>=3.4.0
seaborn>=0.11.0
tqdm>=4.61.0
数据准备
数据集格式:
假设我们有一个CSV格式的中文情感数据集:
text,label
这家餐厅的菜品非常美味,服务态度也很好!,1
质量太差了,用了一天就坏了,不推荐购买。,0
还行吧,中规中矩,没有特别惊艳的地方。,0
超级喜欢这本书,作者写得太棒了!,1
数据加载与预处理类:
import pandas as pd
import jieba
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
class SentimentDataset(Dataset):
"""中文情感分析数据集"""
def __init__(self, texts, labels, word2idx, max_len=128):
self.texts = texts
self.labels = labels
self.word2idx = word2idx
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
# 分词
words = jieba.lcut(text)
# 编码
indices = [self.word2idx.get(word, self.word2idx['<UNK>']) for word in words]
# 截断或填充
if len(indices) > self.max_len:
indices = indices[:self.max_len]
else:
indices = indices + [self.word2idx['<PAD>']] * (self.max_len - len(indices))
return torch.tensor(indices, dtype=torch.long), torch.tensor(label, dtype=torch.long)
def prepare_data(csv_path, test_size=0.2, random_state=42):
"""准备训练和测试数据"""
# 读取数据
df = pd.read_csv(csv_path)
texts = df['text'].tolist()
labels = df['label'].tolist()
# 构建词汇表
all_words = []
for text in texts:
all_words.append(jieba.lcut(text))
word2idx, idx2word = build_vocab(all_words, min_freq=2)
# 划分数据集
train_texts, test_texts, train_labels, test_labels = train_test_split(
texts, labels, test_size=test_size, random_state=random_state, stratify=labels
)
# 创建数据集
train_dataset = SentimentDataset(train_texts, train_labels, word2idx)
test_dataset = SentimentDataset(test_texts, test_labels, word2idx)
return train_dataset, test_dataset, word2idx, idx2word
LSTM模型实现
基础LSTM模型:
import torch
import torch.nn as nn
class LSTMSentimentClassifier(nn.Module):
"""基于LSTM的情感分类模型"""
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
n_layers=2, bidirectional=True, dropout=0.5, pad_idx=0):
super(LSTMSentimentClassifier, self).__init__()
# 词嵌入层
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
# LSTM层
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_dim,
num_layers=n_layers,
bidirectional=bidirectional,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
# 全连接层
fc_input_dim = hidden_dim * 2 if bidirectional else hidden_dim
self.fc = nn.Linear(fc_input_dim, output_dim)
# Dropout层
self.dropout = nn.Dropout(dropout)
def forward(self, text):
"""
前向传播
Args:
text: [batch_size, seq_len]
Returns:
output: [batch_size, output_dim]
"""
# 词嵌入: [batch_size, seq_len, embedding_dim]
embedded = self.dropout(self.embedding(text))
# LSTM: output [batch_size, seq_len, hidden_dim * num_directions]
# hidden [n_layers * num_directions, batch_size, hidden_dim]
output, (hidden, cell) = self.lstm(embedded)
# 拼接双向LSTM的最后一层隐藏状态
if self.lstm.bidirectional:
hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
else:
hidden = hidden[-1,:,:]
# 全连接层
hidden = self.dropout(hidden)
output = self.fc(hidden)
return output
带注意力机制的LSTM模型:
class AttentionLSTM(nn.Module):
"""带注意力机制的LSTM情感分类模型"""
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
n_layers=2, bidirectional=True, dropout=0.5, pad_idx=0):
super(AttentionLSTM, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_dim,
num_layers=n_layers,
bidirectional=bidirectional,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
# 注意力权重
lstm_output_dim = hidden_dim * 2 if bidirectional else hidden_dim
self.attention = nn.Linear(lstm_output_dim, 1)
self.fc = nn.Linear(lstm_output_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def attention_layer(self, lstm_output, mask=None):
"""
注意力层
Args:
lstm_output: [batch_size, seq_len, hidden_dim * num_directions]
mask: [batch_size, seq_len] 用于屏蔽padding位置
Returns:
context: [batch_size, hidden_dim * num_directions]
attention_weights: [batch_size, seq_len]
"""
# 计算注意力分数: [batch_size, seq_len, 1]
attention_scores = self.attention(lstm_output)
attention_scores = attention_scores.squeeze(-1) # [batch_size, seq_len]
# 应用mask
if mask is not None:
attention_scores = attention_scores.masked_fill(mask == 0, -1e10)
# Softmax得到注意力权重
attention_weights = torch.softmax(attention_scores, dim=1)
# 加权求和
context = torch.bmm(attention_weights.unsqueeze(1), lstm_output)
context = context.squeeze(1) # [batch_size, hidden_dim * num_directions]
return context, attention_weights
def forward(self, text):
# 词嵌入
embedded = self.dropout(self.embedding(text))
# LSTM
lstm_output, (hidden, cell) = self.lstm(embedded)
# 创建mask(padding位置为0)
mask = (text != 0).float()
# 注意力
context, attention_weights = self.attention_layer(lstm_output, mask)
# 分类
output = self.fc(self.dropout(context))
return output, attention_weights
模型参数配置:
# 模型超参数配置
class Config:
# 数据相关
max_len = 128 # 最大序列长度
min_freq = 2 # 最小词频
# 模型相关
embedding_dim = 300 # 词嵌入维度
hidden_dim = 256 # LSTM隐藏层维度
n_layers = 2 # LSTM层数
bidirectional = True # 是否使用双向LSTM
dropout = 0.5 # Dropout比例
output_dim = 2 # 输出类别数
# 训练相关
batch_size = 64
learning_rate = 1e-3
epochs = 20
# 设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
模型训练与评估
训练流程
import torch.optim as optim
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
def train_epoch(model, dataloader, optimizer, criterion, device):
"""训练一个epoch"""
model.train()
total_loss = 0
all_preds = []
all_labels = []
progress_bar = tqdm(dataloader, desc="Training")
for texts, labels in progress_bar:
texts, labels = texts.to(device), labels.to(device)
optimizer.zero_grad()
# 前向传播
if isinstance(model, AttentionLSTM):
outputs, _ = model(texts)
else:
outputs = model(texts)
# 计算损失
loss = criterion(outputs, labels)
# 反向传播
loss.backward()
# 梯度裁剪,防止梯度爆炸
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
total_loss += loss.item()
preds = torch.argmax(outputs, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
progress_bar.set_postfix({'loss': loss.item()})
avg_loss = total_loss / len(dataloader)
accuracy = accuracy_score(all_labels, all_preds)
return avg_loss, accuracy
def evaluate(model, dataloader, criterion, device):
"""评估模型"""
model.eval()
total_loss = 0
all_preds = []
all_labels = []
with torch.no_grad():
for texts, labels in tqdm(dataloader, desc="Evaluating"):
texts, labels = texts.to(device), labels.to(device)
if isinstance(model, AttentionLSTM):
outputs, _ = model(texts)
else:
outputs = model(texts)
loss = criterion(outputs, labels)
total_loss += loss.item()
preds = torch.argmax(outputs, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
avg_loss = total_loss / len(dataloader)
accuracy = accuracy_score(all_labels, all_preds)
precision, recall, f1, _ = precision_recall_fscore_support(
all_labels, all_preds, average='binary'
)
return avg_loss, accuracy, precision, recall, f1
def train(model, train_loader, val_loader, config):
"""完整训练流程"""
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=2
)
best_val_loss = float('inf')
train_losses, val_losses = [], []
train_accs, val_accs = [], []
for epoch in range(config.epochs):
print(f"\nEpoch {epoch + 1}/{config.epochs}")
print("-" * 50)
# 训练
train_loss, train_acc = train_epoch(
model, train_loader, optimizer, criterion, config.device
)
# 验证
val_loss, val_acc, precision, recall, f1 = evaluate(
model, val_loader, criterion, config.device
)
# 学习率调整
scheduler.step(val_loss)
# 记录指标
train_losses.append(train_loss)
val_losses.append(val_loss)
train_accs.append(train_acc)
val_accs.append(val_acc)
print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")
print(f"Precision: {precision:.4f} | Recall: {recall:.4f} | F1: {f1:.4f}")
# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_model.pt')
print("✓ Best model saved!")
return train_losses, val_losses, train_accs, val_accs
可视化训练过程
import matplotlib.pyplot as plt
import seaborn as sns
def plot_training_history(train_losses, val_losses, train_accs, val_accs):
"""绘制训练曲线"""
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 损失曲线
axes[0].plot(train_losses, label='Train Loss', marker='o')
axes[0].plot(val_losses, label='Val Loss', marker='s')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Training and Validation Loss')
axes[0].legend()
axes[0].grid(True)
# 准确率曲线
axes[1].plot(train_accs, label='Train Acc', marker='o')
axes[1].plot(val_accs, label='Val Acc', marker='s')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Training and Validation Accuracy')
axes[1].legend()
axes[1].grid(True)
plt.tight_layout()
plt.savefig('training_history.png', dpi=300)
plt.show()
def plot_confusion_matrix(y_true, y_pred, labels=['负面', '正面']):
"""绘制混淆矩阵"""
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=labels, yticklabels=labels)
plt.xlabel('预测标签')
plt.ylabel('真实标签')
plt.title('混淆矩阵')
plt.savefig('confusion_matrix.png', dpi=300)
plt.show()
注意力可视化
def visualize_attention(model, text, word2idx, idx2word, device):
"""可视化注意力权重"""
model.eval()
# 分词并编码
words = jieba.lcut(text)
indices = [word2idx.get(word, word2idx['<UNK>']) for word in words]
# 转为张量
input_tensor = torch.tensor([indices], dtype=torch.long).to(device)
with torch.no_grad():
output, attention_weights = model(input_tensor)
pred = torch.argmax(output, dim=1).item()
# 获取注意力权重
attention = attention_weights[0][:len(words)].cpu().numpy()
# 可视化
plt.figure(figsize=(12, 3))
plt.bar(range(len(words)), attention)
plt.xticks(range(len(words)), words, rotation=45, ha='right')
plt.xlabel('词语')
plt.ylabel('注意力权重')
plt.title(f'注意力分布 (预测: {"正面" if pred == 1 else "负面"})')
plt.tight_layout()
plt.savefig('attention_visualization.png', dpi=300)
plt.show()
return pred, attention
模型优化技巧
1. 预训练词向量
使用预训练词向量可以显著提升模型性能:
import numpy as np
def load_pretrained_embeddings(embedding_path, word2idx, embedding_dim):
"""加载预训练词向量"""
# 初始化嵌入矩阵
vocab_size = len(word2idx)
embedding_matrix = np.random.randn(vocab_size, embedding_dim) * 0.01
# PAD向量设为0
embedding_matrix[0] = np.zeros(embedding_dim)
# 加载预训练词向量
found_count = 0
with open(embedding_path, 'r', encoding='utf-8') as f:
for line in f:
values = line.strip().split()
word = values[0]
if word in word2idx:
idx = word2idx[word]
vector = np.array(values[1:], dtype=np.float32)
embedding_matrix[idx] = vector
found_count += 1
print(f"Found {found_count}/{vocab_size} words in pretrained embeddings")
return embedding_matrix
2. 数据增强
import random
def text_augmentation(text, word2idx):
"""文本数据增强"""
words = jieba.lcut(text)
# 随机删除
if len(words) > 4 and random.random() < 0.1:
idx = random.randint(0, len(words) - 1)
words.pop(idx)
# 随机交换
if len(words) > 2 and random.random() < 0.1:
idx1, idx2 = random.sample(range(len(words)), 2)
words[idx1], words[idx2] = words[idx2], words[idx1]
return ''.join(words)
# 同义词替换(需要同义词词典)
def synonym_replacement(text, synonym_dict, n=1):
"""同义词替换"""
words = jieba.lcut(text)
new_words = words.copy()
candidates = [word for word in words if word in synonym_dict]
random.shuffle(candidates)
for word in candidates[:n]:
synonym = random.choice(synonym_dict[word])
new_words = [synonym if w == word else w for w in new_words]
return ''.join(new_words)
3. 正则化技术
class RegularizedLSTM(nn.Module):
"""带多种正则化的LSTM模型"""
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
n_layers=2, bidirectional=True, dropout=0.5,
embedding_dropout=0.3, weight_dropout=0.2, pad_idx=0):
super(RegularizedLSTM, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
# 嵌入层Dropout
self.embedding_dropout = nn.Dropout(embedding_dropout)
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_dim,
num_layers=n_layers,
bidirectional=bidirectional,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
fc_input_dim = hidden_dim * 2 if bidirectional else hidden_dim
# Layer Normalization
self.layer_norm = nn.LayerNorm(fc_input_dim)
self.fc = nn.Linear(fc_input_dim, output_dim)
self.dropout = nn.Dropout(dropout)
# 权重初始化
self._init_weights()
def _init_weights(self):
"""权重初始化"""
for name, param in self.lstm.named_parameters():
if 'weight_ih' in name:
nn.init.xavier_uniform_(param)
elif 'weight_hh' in name:
nn.init.orthogonal_(param)
elif 'bias' in name:
nn.init.zeros_(param)
nn.init.xavier_uniform_(self.fc.weight)
nn.init.zeros_(self.fc.bias)
def forward(self, text):
embedded = self.embedding_dropout(self.embedding(text))
output, (hidden, cell) = self.lstm(embedded)
if self.lstm.bidirectional:
hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
else:
hidden = hidden[-1,:,:]
hidden = self.layer_norm(hidden)
hidden = self.dropout(hidden)
output = self.fc(hidden)
return output
4. 学习率调度策略
# Warmup + Cosine Annealing
from torch.optim.lr_scheduler import CosineAnnealingLR
class WarmupCosineScheduler:
"""Warmup + Cosine退火学习率调度器"""
def __init__(self, optimizer, warmup_epochs, total_epochs, min_lr=1e-6):
self.optimizer = optimizer
self.warmup_epochs = warmup_epochs
self.total_epochs = total_epochs
self.min_lr = min_lr
self.base_lrs = [group['lr'] for group in optimizer.param_groups]
self.current_epoch = 0
def step(self):
self.current_epoch += 1
if self.current_epoch <= self.warmup_epochs:
# Warmup阶段:线性增加
scale = self.current_epoch / self.warmup_epochs
else:
# Cosine退火阶段
progress = (self.current_epoch - self.warmup_epochs) / (self.total_epochs - self.warmup_epochs)
scale = 0.5 * (1 + np.cos(np.pi * progress))
for param_group, base_lr in zip(self.optimizer.param_groups, self.base_lrs):
param_group['lr'] = max(base_lr * scale, self.min_lr)
def get_lr(self):
return [group['lr'] for group in self.optimizer.param_groups]
5. 类别不平衡处理
from torch.utils.data import WeightedRandomSampler
def get_balanced_sampler(labels):
"""获取平衡采样器"""
class_counts = np.bincount(labels)
class_weights = 1.0 / class_counts
sample_weights = [class_weights[label] for label in labels]
sampler = WeightedRandomSampler(
weights=sample_weights,
num_samples=len(sample_weights),
replacement=True
)
return sampler
def get_class_weights(labels):
"""计算类别权重用于损失函数"""
class_counts = np.bincount(labels)
total = sum(class_counts)
class_weights = total / (len(class_counts) * class_counts)
return torch.tensor(class_weights, dtype=torch.float)
# 使用Focal Loss处理类别不平衡
class FocalLoss(nn.Module):
"""Focal Loss for imbalanced classification"""
def __init__(self, alpha=1, gamma=2, reduction='mean'):
super(FocalLoss, self).__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction
def forward(self, inputs, targets):
ce_loss = nn.functional.cross_entropy(inputs, targets, reduction='none')
pt = torch.exp(-ce_loss)
focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
if self.reduction == 'mean':
return focal_loss.mean()
elif self.reduction == 'sum':
return focal_loss.sum()
return focal_loss
完整代码示例
以下是一个完整的、可直接运行的代码示例:
"""
基于LSTM的中文情感分析完整实现
"""
import os
import random
import numpy as np
import pandas as pd
import jieba
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from collections import Counter
from tqdm import tqdm
# ==================== 配置 ====================
class Config:
# 随机种子
seed = 42
# 数据相关
max_len = 128
min_freq = 2
max_vocab_size = 50000
# 模型相关
embedding_dim = 256
hidden_dim = 128
n_layers = 2
bidirectional = True
dropout = 0.5
output_dim = 2
# 训练相关
batch_size = 64
learning_rate = 1e-3
epochs = 15
# 设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def set_seed(seed):
"""设置随机种子"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
# ==================== 数据处理 ====================
def build_vocab(texts, min_freq=2, max_vocab_size=50000):
"""构建词汇表"""
word_counts = Counter()
for text in texts:
words = jieba.lcut(text)
word_counts.update(words)
words = [word for word, count in word_counts.most_common(max_vocab_size)
if count >= min_freq]
special_tokens = ['<PAD>', '<UNK>']
words = special_tokens + words
word2idx = {word: idx for idx, word in enumerate(words)}
idx2word = {idx: word for word, idx in word2idx.items()}
return word2idx, idx2word
class SentimentDataset(Dataset):
"""情感分析数据集"""
def __init__(self, texts, labels, word2idx, max_len=128):
self.texts = texts
self.labels = labels
self.word2idx = word2idx
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
words = jieba.lcut(text)
indices = [self.word2idx.get(word, self.word2idx['<UNK>']) for word in words]
if len(indices) > self.max_len:
indices = indices[:self.max_len]
else:
indices = indices + [self.word2idx['<PAD>']] * (self.max_len - len(indices))
return torch.tensor(indices, dtype=torch.long), torch.tensor(label, dtype=torch.long)
# ==================== 模型定义 ====================
class LSTMClassifier(nn.Module):
"""LSTM情感分类器"""
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
n_layers=2, bidirectional=True, dropout=0.5, pad_idx=0):
super(LSTMClassifier, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_dim,
num_layers=n_layers,
bidirectional=bidirectional,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
fc_input_dim = hidden_dim * 2 if bidirectional else hidden_dim
self.fc = nn.Linear(fc_input_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
embedded = self.dropout(self.embedding(text))
output, (hidden, cell) = self.lstm(embedded)
if self.lstm.bidirectional:
hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
else:
hidden = hidden[-1,:,:]
hidden = self.dropout(hidden)
return self.fc(hidden)
# ==================== 训练和评估 ====================
def train_epoch(model, dataloader, optimizer, criterion, device):
"""训练一个epoch"""
model.train()
total_loss = 0
all_preds, all_labels = [], []
for texts, labels in tqdm(dataloader, desc="Training"):
texts, labels = texts.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(texts)
loss = criterion(outputs, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
total_loss += loss.item()
preds = torch.argmax(outputs, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
return total_loss / len(dataloader), accuracy_score(all_labels, all_preds)
def evaluate(model, dataloader, criterion, device):
"""评估模型"""
model.eval()
total_loss = 0
all_preds, all_labels = [], []
with torch.no_grad():
for texts, labels in tqdm(dataloader, desc="Evaluating"):
texts, labels = texts.to(device), labels.to(device)
outputs = model(texts)
loss = criterion(outputs, labels)
total_loss += loss.item()
preds = torch.argmax(outputs, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
return total_loss / len(dataloader), accuracy_score(all_labels, all_preds), all_preds, all_labels
def predict(model, text, word2idx, device, max_len=128):
"""单条文本预测"""
model.eval()
words = jieba.lcut(text)
indices = [word2idx.get(word, word2idx['<UNK>']) for word in words]
if len(indices) > max_len:
indices = indices[:max_len]
else:
indices = indices + [word2idx['<PAD>']] * (max_len - len(indices))
input_tensor = torch.tensor([indices], dtype=torch.long).to(device)
with torch.no_grad():
output = model(input_tensor)
prob = torch.softmax(output, dim=1)
pred = torch.argmax(prob, dim=1).item()
return pred, prob[0].cpu().numpy()
# ==================== 主函数 ====================
def main():
# 设置随机种子
config = Config()
set_seed(config.seed)
print(f"Using device: {config.device}")
# 创建示例数据(实际使用时替换为真实数据)
sample_data = {
'text': [
"这部电影太棒了,剧情精彩,演员演技出色!",
"非常失望,完全是浪费时间,不推荐观看。",
"服务态度很好,菜品也很美味,下次还会来!",
"质量太差了,用了一天就坏了,强烈不推荐。",
"书写得非常好,内容丰富,受益匪浅。",
"太难用了,界面设计一塌糊涂,卸载了。",
"性价比很高,物流也很快,满意!",
"完全是虚假宣传,实物和图片差太多了。",
"音质很棒,佩戴舒适,非常推荐!",
"客服态度恶劣,问题一直没解决,差评。",
"很喜欢这个颜色,穿着也很舒服。",
"做工粗糙,线头很多,退货了。",
"老师讲解清晰,课程内容实用,学到很多。",
"纯粹是骗钱的课程,内容毫无价值。",
"环境优雅,食物精致,约会好去处。",
"上菜太慢了,等了一个多小时,体验很差。",
] * 50, # 扩展数据量
'label': [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0] * 50
}
df = pd.DataFrame(sample_data)
texts = df['text'].tolist()
labels = df['label'].tolist()
# 构建词汇表
print("Building vocabulary...")
word2idx, idx2word = build_vocab(texts, config.min_freq, config.max_vocab_size)
vocab_size = len(word2idx)
print(f"Vocabulary size: {vocab_size}")
# 划分数据集
train_texts, test_texts, train_labels, test_labels = train_test_split(
texts, labels, test_size=0.2, random_state=config.seed, stratify=labels
)
# 创建数据集和数据加载器
train_dataset = SentimentDataset(train_texts, train_labels, word2idx, config.max_len)
test_dataset = SentimentDataset(test_texts, test_labels, word2idx, config.max_len)
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=config.batch_size, shuffle=False)
# 创建模型
model = LSTMClassifier(
vocab_size=vocab_size,
embedding_dim=config.embedding_dim,
hidden_dim=config.hidden_dim,
output_dim=config.output_dim,
n_layers=config.n_layers,
bidirectional=config.bidirectional,
dropout=config.dropout
).to(config.device)
# 打印模型参数量
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=2)
# 训练模型
print("\nStarting training...")
best_val_loss = float('inf')
for epoch in range(config.epochs):
print(f"\nEpoch {epoch + 1}/{config.epochs}")
print("-" * 40)
train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, config.device)
val_loss, val_acc, _, _ = evaluate(model, test_loader, criterion, config.device)
scheduler.step(val_loss)
print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_model.pt')
print("✓ Best model saved!")
# 加载最佳模型并测试
print("\n" + "=" * 40)
print("Final Evaluation")
print("=" * 40)
model.load_state_dict(torch.load('best_model.pt'))
_, test_acc, preds, true_labels = evaluate(model, test_loader, criterion, config.device)
print(f"\nTest Accuracy: {test_acc:.4f}")
print("\nClassification Report:")
print(classification_report(true_labels, preds, target_names=['负面', '正面']))
# 单条预测示例
print("\n" + "=" * 40)
print("Prediction Examples")
print("=" * 40)
test_sentences = [
"这个产品真的很好用,强烈推荐!",
"太糟糕了,再也不会买了。",
"一般般吧,没有特别好也没有特别差。",
]
for sentence in test_sentences:
pred, prob = predict(model, sentence, word2idx, config.device)
sentiment = "正面" if pred == 1 else "负面"
print(f"\n文本: {sentence}")
print(f"预测: {sentiment} (置信度: {prob[pred]:.4f})")
if __name__ == "__main__":
main()
总结与展望
本文总结
本文详细介绍了基于LSTM的中文情感分析实现,涵盖了以下核心内容:
- 理论基础:从RNN的局限性出发,深入讲解了LSTM的门控机制和工作原理
- 中文预处理:介绍了分词、停用词处理、文本向量化等关键步骤
- 模型实现:提供了基础LSTM和带注意力机制LSTM的完整实现
- 训练优化:分享了预训练词向量、数据增强、正则化等多种优化技巧
- 完整代码:提供了可直接运行的完整示例代码
LSTM的优势与局限
优势:
- 能够有效捕捉长距离依赖
- 相比传统RNN,梯度消失问题大大缓解
- 模型结构成熟,训练稳定
- 适合中等规模数据集
局限:
- 序列处理仍然是顺序的,难以并行化
- 对于超长序列,仍可能丢失信息
- 参数量相对较大
进阶方向
- Transformer架构:基于自注意力机制,支持并行计算,效果更好
- 预训练语言模型:BERT、RoBERTa等模型在情感分析任务上表现优异
- 多任务学习:同时学习多个相关任务,提升模型泛化能力
- 方面级情感分析:更细粒度的情感分析任务
- 跨领域迁移:将一个领域学到的知识迁移到其他领域
推荐资源
论文:
- Hochreiter & Schmidhuber (1997). “Long Short-Term Memory”
- Graves (2013). “Generating Sequences With Recurrent Neural Networks”
开源数据集:
- ChnSentiCorp: 中文情感分析数据集
- Weibo Sentiment: 微博情感数据集
- Online Shopping Reviews: 电商评论数据集
工具库:
- PyTorch: https://pytorch.org/
- Hugging Face Transformers: https://huggingface.co/
- jieba: https://github.com/fxsjy/jieba
作者注:本文旨在提供一个完整的LSTM中文情感分析实现指南。如有问题或建议,欢迎讨论交流。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)