基于LSTM的中文情感分析实战指南

在这里插入图片描述

目录

  1. 引言
  2. 理论基础
  3. 中文文本预处理
  4. 模型构建
  5. 模型训练与评估
  6. 模型优化技巧
  7. 完整代码示例
  8. 总结与展望

引言

情感分析(Sentiment Analysis)是自然语言处理中的一项重要任务,旨在从文本中识别和提取主观信息,判断文本所表达的情感倾向(正面、负面或中性)。在电商评论分析、舆情监控、社交媒体分析等领域有着广泛的应用。

中文情感分析相比英文有其独特的挑战:

  • 无空格分隔:中文词语之间没有天然的分隔符,需要进行分词处理
  • 一词多义:同一个词在不同语境下可能有不同的情感色彩
  • 表达含蓄:中文表达往往比较委婉,情感判断需要更多上下文

本文将详细介绍如何使用LSTM(Long Short-Term Memory)网络构建一个中文情感分析模型。


理论基础

循环神经网络(RNN)

循环神经网络是一类专门处理序列数据的神经网络。与传统前馈神经网络不同,RNN在隐藏层之间引入了循环连接,使得网络能够保持对先前信息的"记忆"。

RNN的基本结构:

        ┌─────────────────────────────────────┐
        │                                     │
        ▼                                     │
    ┌───────┐     ┌───────┐     ┌───────┐    │
    │  h_t  │────▶│ h_t+1 │────▶│ h_t+2 │────┘
    └───────┘     └───────┘     └───────┘
        ▲             ▲             ▲
        │             │             │
    ┌───────┐     ┌───────┐     ┌───────┐
    │  x_t  │     │ x_t+1 │     │ x_t+2 │
    └───────┘     └───────┘     └───────┘

RNN的数学表达:

h t = tanh ⁡ ( W x h ⋅ x t + W h h ⋅ h t − 1 + b h ) h_t = \tanh(W_{xh} \cdot x_t + W_{hh} \cdot h_{t-1} + b_h) ht=tanh(Wxhxt+Whhht1+bh)

y t = W h y ⋅ h t + b y y_t = W_{hy} \cdot h_t + b_y yt=Whyht+by

其中:

  • x t x_t xt 是时刻 t t t 的输入
  • h t h_t ht 是时刻 t t t 的隐藏状态
  • y t y_t yt 是时刻 t t t 的输出
  • W W W b b b 分别是权重矩阵和偏置向量

RNN的局限性——梯度消失问题:

在处理长序列时,RNN会遇到梯度消失(或梯度爆炸)问题。当序列很长时,早期的信息在反向传播过程中会逐渐衰减,导致模型难以学习长距离依赖关系。

LSTM网络详解

LSTM(Long Short-Term Memory)是一种特殊的RNN架构,通过引入门控机制(Gating Mechanism)来解决梯度消失问题,使网络能够有效地学习长期依赖关系。

LSTM的核心组件:

LSTM单元包含三个门和一个细胞状态:

                    ┌─────────────────────────────────────────┐
                    │              LSTM Cell                   │
                    │                                         │
    c_{t-1} ───────▶│──[×]────────[+]────────────────────────▶ c_t
                    │   ▲          ▲                          │
                    │   │    ┌─────┴─────┐                    │
                    │   │    │           │                    │
                    │   │  [tanh]      [×]◀── i_t (输入门)    │
                    │   │    ▲          ▲                     │
                    │   │    │          │                     │
                    │  f_t   └────┬─────┘                     │
                    │ (遗忘门)    │                            │
                    │             │                            │
    h_{t-1} ───────▶│─────────────┼──────────[×]─────────────▶ h_t
                    │             │           ▲               │
    x_t ───────────▶│─────────────┴───────[tanh]◀─[×]◀── o_t │
                    │                              (输出门)   │
                    └─────────────────────────────────────────┘

三个门的作用:

  1. 遗忘门(Forget Gate):决定从细胞状态中丢弃哪些信息
    f t = σ ( W f ⋅ [ h t − 1 , x t ] + b f ) f_t = \sigma(W_f \cdot [h_{t-1}, x_t] + b_f) ft=σ(Wf[ht1,xt]+bf)

  2. 输入门(Input Gate):决定哪些新信息将被存储到细胞状态中
    i t = σ ( W i ⋅ [ h t − 1 , x t ] + b i ) i_t = \sigma(W_i \cdot [h_{t-1}, x_t] + b_i) it=σ(Wi[ht1,xt]+bi)
    C ~ t = tanh ⁡ ( W C ⋅ [ h t − 1 , x t ] + b C ) \tilde{C}_t = \tanh(W_C \cdot [h_{t-1}, x_t] + b_C) C~t=tanh(WC[ht1,xt]+bC)

  3. 输出门(Output Gate):决定输出什么内容
    o t = σ ( W o ⋅ [ h t − 1 , x t ] + b o ) o_t = \sigma(W_o \cdot [h_{t-1}, x_t] + b_o) ot=σ(Wo[ht1,xt]+bo)

细胞状态更新:

C t = f t ⊙ C t − 1 + i t ⊙ C ~ t C_t = f_t \odot C_{t-1} + i_t \odot \tilde{C}_t Ct=ftCt1+itC~t

隐藏状态更新:

h t = o t ⊙ tanh ⁡ ( C t ) h_t = o_t \odot \tanh(C_t) ht=ottanh(Ct)

其中 σ \sigma σ 表示Sigmoid函数, ⊙ \odot 表示逐元素相乘。

LSTM为何能解决梯度消失问题?

  • 细胞状态 C t C_t Ct 通过线性连接在时间步之间传递,梯度可以几乎无损地流动
  • 遗忘门可以学习保持长期记忆
  • 门控机制使网络能够选择性地记忆或遗忘信息

情感分析任务概述

情感分析可以分为以下几个层次:

层次 描述 示例
文档级 对整个文档的情感进行分类 判断一篇影评是正面还是负面
句子级 对单个句子的情感进行分类 判断"这部电影太棒了"的情感
方面级 识别特定方面的情感 "手机拍照不错但电池不行"中对拍照和电池的不同情感

本文将聚焦于句子级情感分析,即二分类任务(正面/负面)。


中文文本预处理

中文文本预处理是构建高质量情感分析模型的关键步骤。

中文分词

由于中文没有天然的词语分隔符,需要使用分词工具将句子切分成词语序列。常用的中文分词工具有:

工具 特点 适用场景
jieba 轻量级,易用 通用场景
pkuseg 准确率高 学术研究
HanLP 功能全面 企业级应用
LTP 哈工大出品 深度NLP任务

jieba分词示例:

import jieba

text = "这部电影的剧情非常精彩,演员演技也很出色!"

# 精确模式(默认)
words = jieba.lcut(text)
print("精确模式:", words)
# 输出: ['这部', '电影', '的', '剧情', '非常', '精彩', ',', '演员', '演技', '也', '很', '出色', '!']

# 全模式
words_full = jieba.lcut(text, cut_all=True)
print("全模式:", words_full)

# 搜索引擎模式
words_search = jieba.lcut_for_search(text)
print("搜索引擎模式:", words_search)

添加自定义词典:

# 添加自定义词语
jieba.add_word("演技炸裂")
jieba.add_word("神仙打架")

# 加载自定义词典文件
jieba.load_userdict("custom_dict.txt")

停用词处理

停用词是指在文本中频繁出现但对情感分析贡献不大的词语,如"的"、“是”、"了"等。

加载停用词表:

def load_stopwords(filepath):
    """加载停用词表"""
    with open(filepath, 'r', encoding='utf-8') as f:
        stopwords = set([line.strip() for line in f])
    return stopwords

def remove_stopwords(words, stopwords):
    """去除停用词"""
    return [word for word in words if word not in stopwords]

# 使用示例
stopwords = load_stopwords('stopwords.txt')
text = "这部电影的剧情非常精彩"
words = jieba.lcut(text)
filtered_words = remove_stopwords(words, stopwords)
print("去除停用词后:", filtered_words)
# 输出: ['电影', '剧情', '非常', '精彩']

常见中文停用词表来源:

  • 百度停用词表
  • 哈工大停用词表
  • 四川大学机器智能实验室停用词表

文本向量化

将文本转换为数值向量是深度学习模型处理文本的前提。

1. 词汇表构建

from collections import Counter

def build_vocab(texts, min_freq=1, max_vocab_size=50000):
    """
    构建词汇表

    Args:
        texts: 分词后的文本列表,每个元素是词语列表
        min_freq: 最小词频阈值
        max_vocab_size: 词汇表最大大小

    Returns:
        word2idx: 词语到索引的映射
        idx2word: 索引到词语的映射
    """
    # 统计词频
    word_counts = Counter()
    for text in texts:
        word_counts.update(text)

    # 过滤低频词并排序
    words = [word for word, count in word_counts.most_common(max_vocab_size)
             if count >= min_freq]

    # 添加特殊标记
    special_tokens = ['<PAD>', '<UNK>']
    words = special_tokens + words

    # 构建映射
    word2idx = {word: idx for idx, word in enumerate(words)}
    idx2word = {idx: word for word, idx in word2idx.items()}

    return word2idx, idx2word

2. 文本编码

def encode_texts(texts, word2idx, max_len=128):
    """
    将文本转换为索引序列

    Args:
        texts: 分词后的文本列表
        word2idx: 词语到索引的映射
        max_len: 最大序列长度

    Returns:
        encoded: 编码后的文本数组
    """
    import numpy as np

    encoded = []
    for text in texts:
        # 将词语转换为索引
        indices = [word2idx.get(word, word2idx['<UNK>']) for word in text]

        # 截断或填充
        if len(indices) > max_len:
            indices = indices[:max_len]
        else:
            indices = indices + [word2idx['<PAD>']] * (max_len - len(indices))

        encoded.append(indices)

    return np.array(encoded)

3. 词嵌入(Word Embedding)

词嵌入是将离散的词语映射到连续的向量空间,使得语义相似的词语在向量空间中距离更近。

import torch
import torch.nn as nn

class EmbeddingLayer(nn.Module):
    def __init__(self, vocab_size, embedding_dim, pretrained_embeddings=None):
        super(EmbeddingLayer, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

        # 加载预训练词向量
        if pretrained_embeddings is not None:
            self.embedding.weight.data.copy_(torch.from_numpy(pretrained_embeddings))
            # 可选:冻结词向量
            # self.embedding.weight.requires_grad = False

    def forward(self, x):
        return self.embedding(x)

常用的中文预训练词向量:

  • Tencent AI Lab Embedding
  • 北京师范大学中文词向量
  • fastText中文预训练词向量

模型构建

环境配置

所需依赖:

# 创建虚拟环境
conda create -n sentiment python=3.9
conda activate sentiment

# 安装依赖
pip install torch torchvision torchaudio
pip install jieba
pip install numpy pandas scikit-learn
pip install matplotlib seaborn
pip install tqdm

requirements.txt:

torch>=1.9.0
jieba>=0.42.1
numpy>=1.21.0
pandas>=1.3.0
scikit-learn>=0.24.0
matplotlib>=3.4.0
seaborn>=0.11.0
tqdm>=4.61.0

数据准备

数据集格式:

假设我们有一个CSV格式的中文情感数据集:

text,label
这家餐厅的菜品非常美味,服务态度也很好!,1
质量太差了,用了一天就坏了,不推荐购买。,0
还行吧,中规中矩,没有特别惊艳的地方。,0
超级喜欢这本书,作者写得太棒了!,1

数据加载与预处理类:

import pandas as pd
import jieba
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

class SentimentDataset(Dataset):
    """中文情感分析数据集"""

    def __init__(self, texts, labels, word2idx, max_len=128):
        self.texts = texts
        self.labels = labels
        self.word2idx = word2idx
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        # 分词
        words = jieba.lcut(text)

        # 编码
        indices = [self.word2idx.get(word, self.word2idx['<UNK>']) for word in words]

        # 截断或填充
        if len(indices) > self.max_len:
            indices = indices[:self.max_len]
        else:
            indices = indices + [self.word2idx['<PAD>']] * (self.max_len - len(indices))

        return torch.tensor(indices, dtype=torch.long), torch.tensor(label, dtype=torch.long)


def prepare_data(csv_path, test_size=0.2, random_state=42):
    """准备训练和测试数据"""

    # 读取数据
    df = pd.read_csv(csv_path)
    texts = df['text'].tolist()
    labels = df['label'].tolist()

    # 构建词汇表
    all_words = []
    for text in texts:
        all_words.append(jieba.lcut(text))

    word2idx, idx2word = build_vocab(all_words, min_freq=2)

    # 划分数据集
    train_texts, test_texts, train_labels, test_labels = train_test_split(
        texts, labels, test_size=test_size, random_state=random_state, stratify=labels
    )

    # 创建数据集
    train_dataset = SentimentDataset(train_texts, train_labels, word2idx)
    test_dataset = SentimentDataset(test_texts, test_labels, word2idx)

    return train_dataset, test_dataset, word2idx, idx2word

LSTM模型实现

基础LSTM模型:

import torch
import torch.nn as nn

class LSTMSentimentClassifier(nn.Module):
    """基于LSTM的情感分类模型"""

    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
                 n_layers=2, bidirectional=True, dropout=0.5, pad_idx=0):
        super(LSTMSentimentClassifier, self).__init__()

        # 词嵌入层
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)

        # LSTM层
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            bidirectional=bidirectional,
            dropout=dropout if n_layers > 1 else 0,
            batch_first=True
        )

        # 全连接层
        fc_input_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.fc = nn.Linear(fc_input_dim, output_dim)

        # Dropout层
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        """
        前向传播

        Args:
            text: [batch_size, seq_len]

        Returns:
            output: [batch_size, output_dim]
        """
        # 词嵌入: [batch_size, seq_len, embedding_dim]
        embedded = self.dropout(self.embedding(text))

        # LSTM: output [batch_size, seq_len, hidden_dim * num_directions]
        #       hidden [n_layers * num_directions, batch_size, hidden_dim]
        output, (hidden, cell) = self.lstm(embedded)

        # 拼接双向LSTM的最后一层隐藏状态
        if self.lstm.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]

        # 全连接层
        hidden = self.dropout(hidden)
        output = self.fc(hidden)

        return output

带注意力机制的LSTM模型:

class AttentionLSTM(nn.Module):
    """带注意力机制的LSTM情感分类模型"""

    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
                 n_layers=2, bidirectional=True, dropout=0.5, pad_idx=0):
        super(AttentionLSTM, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)

        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            bidirectional=bidirectional,
            dropout=dropout if n_layers > 1 else 0,
            batch_first=True
        )

        # 注意力权重
        lstm_output_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.attention = nn.Linear(lstm_output_dim, 1)

        self.fc = nn.Linear(lstm_output_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def attention_layer(self, lstm_output, mask=None):
        """
        注意力层

        Args:
            lstm_output: [batch_size, seq_len, hidden_dim * num_directions]
            mask: [batch_size, seq_len] 用于屏蔽padding位置

        Returns:
            context: [batch_size, hidden_dim * num_directions]
            attention_weights: [batch_size, seq_len]
        """
        # 计算注意力分数: [batch_size, seq_len, 1]
        attention_scores = self.attention(lstm_output)
        attention_scores = attention_scores.squeeze(-1)  # [batch_size, seq_len]

        # 应用mask
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, -1e10)

        # Softmax得到注意力权重
        attention_weights = torch.softmax(attention_scores, dim=1)

        # 加权求和
        context = torch.bmm(attention_weights.unsqueeze(1), lstm_output)
        context = context.squeeze(1)  # [batch_size, hidden_dim * num_directions]

        return context, attention_weights

    def forward(self, text):
        # 词嵌入
        embedded = self.dropout(self.embedding(text))

        # LSTM
        lstm_output, (hidden, cell) = self.lstm(embedded)

        # 创建mask(padding位置为0)
        mask = (text != 0).float()

        # 注意力
        context, attention_weights = self.attention_layer(lstm_output, mask)

        # 分类
        output = self.fc(self.dropout(context))

        return output, attention_weights

模型参数配置:

# 模型超参数配置
class Config:
    # 数据相关
    max_len = 128           # 最大序列长度
    min_freq = 2            # 最小词频

    # 模型相关
    embedding_dim = 300     # 词嵌入维度
    hidden_dim = 256        # LSTM隐藏层维度
    n_layers = 2            # LSTM层数
    bidirectional = True    # 是否使用双向LSTM
    dropout = 0.5           # Dropout比例
    output_dim = 2          # 输出类别数

    # 训练相关
    batch_size = 64
    learning_rate = 1e-3
    epochs = 20

    # 设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

模型训练与评估

训练流程

import torch.optim as optim
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def train_epoch(model, dataloader, optimizer, criterion, device):
    """训练一个epoch"""
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []

    progress_bar = tqdm(dataloader, desc="Training")
    for texts, labels in progress_bar:
        texts, labels = texts.to(device), labels.to(device)

        optimizer.zero_grad()

        # 前向传播
        if isinstance(model, AttentionLSTM):
            outputs, _ = model(texts)
        else:
            outputs = model(texts)

        # 计算损失
        loss = criterion(outputs, labels)

        # 反向传播
        loss.backward()

        # 梯度裁剪,防止梯度爆炸
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()

        total_loss += loss.item()
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

        progress_bar.set_postfix({'loss': loss.item()})

    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(all_labels, all_preds)

    return avg_loss, accuracy


def evaluate(model, dataloader, criterion, device):
    """评估模型"""
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for texts, labels in tqdm(dataloader, desc="Evaluating"):
            texts, labels = texts.to(device), labels.to(device)

            if isinstance(model, AttentionLSTM):
                outputs, _ = model(texts)
            else:
                outputs = model(texts)

            loss = criterion(outputs, labels)
            total_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='binary'
    )

    return avg_loss, accuracy, precision, recall, f1


def train(model, train_loader, val_loader, config):
    """完整训练流程"""

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=2
    )

    best_val_loss = float('inf')
    train_losses, val_losses = [], []
    train_accs, val_accs = [], []

    for epoch in range(config.epochs):
        print(f"\nEpoch {epoch + 1}/{config.epochs}")
        print("-" * 50)

        # 训练
        train_loss, train_acc = train_epoch(
            model, train_loader, optimizer, criterion, config.device
        )

        # 验证
        val_loss, val_acc, precision, recall, f1 = evaluate(
            model, val_loader, criterion, config.device
        )

        # 学习率调整
        scheduler.step(val_loss)

        # 记录指标
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)

        print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")
        print(f"Precision: {precision:.4f} | Recall: {recall:.4f} | F1: {f1:.4f}")

        # 保存最佳模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pt')
            print("✓ Best model saved!")

    return train_losses, val_losses, train_accs, val_accs

可视化训练过程

import matplotlib.pyplot as plt
import seaborn as sns

def plot_training_history(train_losses, val_losses, train_accs, val_accs):
    """绘制训练曲线"""
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    # 损失曲线
    axes[0].plot(train_losses, label='Train Loss', marker='o')
    axes[0].plot(val_losses, label='Val Loss', marker='s')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('Training and Validation Loss')
    axes[0].legend()
    axes[0].grid(True)

    # 准确率曲线
    axes[1].plot(train_accs, label='Train Acc', marker='o')
    axes[1].plot(val_accs, label='Val Acc', marker='s')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Accuracy')
    axes[1].set_title('Training and Validation Accuracy')
    axes[1].legend()
    axes[1].grid(True)

    plt.tight_layout()
    plt.savefig('training_history.png', dpi=300)
    plt.show()


def plot_confusion_matrix(y_true, y_pred, labels=['负面', '正面']):
    """绘制混淆矩阵"""
    from sklearn.metrics import confusion_matrix

    cm = confusion_matrix(y_true, y_pred)

    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=labels, yticklabels=labels)
    plt.xlabel('预测标签')
    plt.ylabel('真实标签')
    plt.title('混淆矩阵')
    plt.savefig('confusion_matrix.png', dpi=300)
    plt.show()

注意力可视化

def visualize_attention(model, text, word2idx, idx2word, device):
    """可视化注意力权重"""
    model.eval()

    # 分词并编码
    words = jieba.lcut(text)
    indices = [word2idx.get(word, word2idx['<UNK>']) for word in words]

    # 转为张量
    input_tensor = torch.tensor([indices], dtype=torch.long).to(device)

    with torch.no_grad():
        output, attention_weights = model(input_tensor)
        pred = torch.argmax(output, dim=1).item()

    # 获取注意力权重
    attention = attention_weights[0][:len(words)].cpu().numpy()

    # 可视化
    plt.figure(figsize=(12, 3))
    plt.bar(range(len(words)), attention)
    plt.xticks(range(len(words)), words, rotation=45, ha='right')
    plt.xlabel('词语')
    plt.ylabel('注意力权重')
    plt.title(f'注意力分布 (预测: {"正面" if pred == 1 else "负面"})')
    plt.tight_layout()
    plt.savefig('attention_visualization.png', dpi=300)
    plt.show()

    return pred, attention

模型优化技巧

1. 预训练词向量

使用预训练词向量可以显著提升模型性能:

import numpy as np

def load_pretrained_embeddings(embedding_path, word2idx, embedding_dim):
    """加载预训练词向量"""

    # 初始化嵌入矩阵
    vocab_size = len(word2idx)
    embedding_matrix = np.random.randn(vocab_size, embedding_dim) * 0.01

    # PAD向量设为0
    embedding_matrix[0] = np.zeros(embedding_dim)

    # 加载预训练词向量
    found_count = 0
    with open(embedding_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.strip().split()
            word = values[0]
            if word in word2idx:
                idx = word2idx[word]
                vector = np.array(values[1:], dtype=np.float32)
                embedding_matrix[idx] = vector
                found_count += 1

    print(f"Found {found_count}/{vocab_size} words in pretrained embeddings")
    return embedding_matrix

2. 数据增强

import random

def text_augmentation(text, word2idx):
    """文本数据增强"""
    words = jieba.lcut(text)

    # 随机删除
    if len(words) > 4 and random.random() < 0.1:
        idx = random.randint(0, len(words) - 1)
        words.pop(idx)

    # 随机交换
    if len(words) > 2 and random.random() < 0.1:
        idx1, idx2 = random.sample(range(len(words)), 2)
        words[idx1], words[idx2] = words[idx2], words[idx1]

    return ''.join(words)


# 同义词替换(需要同义词词典)
def synonym_replacement(text, synonym_dict, n=1):
    """同义词替换"""
    words = jieba.lcut(text)
    new_words = words.copy()

    candidates = [word for word in words if word in synonym_dict]
    random.shuffle(candidates)

    for word in candidates[:n]:
        synonym = random.choice(synonym_dict[word])
        new_words = [synonym if w == word else w for w in new_words]

    return ''.join(new_words)

3. 正则化技术

class RegularizedLSTM(nn.Module):
    """带多种正则化的LSTM模型"""

    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
                 n_layers=2, bidirectional=True, dropout=0.5,
                 embedding_dropout=0.3, weight_dropout=0.2, pad_idx=0):
        super(RegularizedLSTM, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)

        # 嵌入层Dropout
        self.embedding_dropout = nn.Dropout(embedding_dropout)

        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            bidirectional=bidirectional,
            dropout=dropout if n_layers > 1 else 0,
            batch_first=True
        )

        fc_input_dim = hidden_dim * 2 if bidirectional else hidden_dim

        # Layer Normalization
        self.layer_norm = nn.LayerNorm(fc_input_dim)

        self.fc = nn.Linear(fc_input_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

        # 权重初始化
        self._init_weights()

    def _init_weights(self):
        """权重初始化"""
        for name, param in self.lstm.named_parameters():
            if 'weight_ih' in name:
                nn.init.xavier_uniform_(param)
            elif 'weight_hh' in name:
                nn.init.orthogonal_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)

        nn.init.xavier_uniform_(self.fc.weight)
        nn.init.zeros_(self.fc.bias)

    def forward(self, text):
        embedded = self.embedding_dropout(self.embedding(text))
        output, (hidden, cell) = self.lstm(embedded)

        if self.lstm.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]

        hidden = self.layer_norm(hidden)
        hidden = self.dropout(hidden)
        output = self.fc(hidden)

        return output

4. 学习率调度策略

# Warmup + Cosine Annealing
from torch.optim.lr_scheduler import CosineAnnealingLR

class WarmupCosineScheduler:
    """Warmup + Cosine退火学习率调度器"""

    def __init__(self, optimizer, warmup_epochs, total_epochs, min_lr=1e-6):
        self.optimizer = optimizer
        self.warmup_epochs = warmup_epochs
        self.total_epochs = total_epochs
        self.min_lr = min_lr
        self.base_lrs = [group['lr'] for group in optimizer.param_groups]
        self.current_epoch = 0

    def step(self):
        self.current_epoch += 1

        if self.current_epoch <= self.warmup_epochs:
            # Warmup阶段:线性增加
            scale = self.current_epoch / self.warmup_epochs
        else:
            # Cosine退火阶段
            progress = (self.current_epoch - self.warmup_epochs) / (self.total_epochs - self.warmup_epochs)
            scale = 0.5 * (1 + np.cos(np.pi * progress))

        for param_group, base_lr in zip(self.optimizer.param_groups, self.base_lrs):
            param_group['lr'] = max(base_lr * scale, self.min_lr)

    def get_lr(self):
        return [group['lr'] for group in self.optimizer.param_groups]

5. 类别不平衡处理

from torch.utils.data import WeightedRandomSampler

def get_balanced_sampler(labels):
    """获取平衡采样器"""
    class_counts = np.bincount(labels)
    class_weights = 1.0 / class_counts
    sample_weights = [class_weights[label] for label in labels]

    sampler = WeightedRandomSampler(
        weights=sample_weights,
        num_samples=len(sample_weights),
        replacement=True
    )
    return sampler


def get_class_weights(labels):
    """计算类别权重用于损失函数"""
    class_counts = np.bincount(labels)
    total = sum(class_counts)
    class_weights = total / (len(class_counts) * class_counts)
    return torch.tensor(class_weights, dtype=torch.float)


# 使用Focal Loss处理类别不平衡
class FocalLoss(nn.Module):
    """Focal Loss for imbalanced classification"""

    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = nn.functional.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        return focal_loss

完整代码示例

以下是一个完整的、可直接运行的代码示例:

"""
基于LSTM的中文情感分析完整实现
"""

import os
import random
import numpy as np
import pandas as pd
import jieba
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from collections import Counter
from tqdm import tqdm

# ==================== 配置 ====================

class Config:
    # 随机种子
    seed = 42

    # 数据相关
    max_len = 128
    min_freq = 2
    max_vocab_size = 50000

    # 模型相关
    embedding_dim = 256
    hidden_dim = 128
    n_layers = 2
    bidirectional = True
    dropout = 0.5
    output_dim = 2

    # 训练相关
    batch_size = 64
    learning_rate = 1e-3
    epochs = 15

    # 设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


def set_seed(seed):
    """设置随机种子"""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


# ==================== 数据处理 ====================

def build_vocab(texts, min_freq=2, max_vocab_size=50000):
    """构建词汇表"""
    word_counts = Counter()
    for text in texts:
        words = jieba.lcut(text)
        word_counts.update(words)

    words = [word for word, count in word_counts.most_common(max_vocab_size)
             if count >= min_freq]

    special_tokens = ['<PAD>', '<UNK>']
    words = special_tokens + words

    word2idx = {word: idx for idx, word in enumerate(words)}
    idx2word = {idx: word for word, idx in word2idx.items()}

    return word2idx, idx2word


class SentimentDataset(Dataset):
    """情感分析数据集"""

    def __init__(self, texts, labels, word2idx, max_len=128):
        self.texts = texts
        self.labels = labels
        self.word2idx = word2idx
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        words = jieba.lcut(text)
        indices = [self.word2idx.get(word, self.word2idx['<UNK>']) for word in words]

        if len(indices) > self.max_len:
            indices = indices[:self.max_len]
        else:
            indices = indices + [self.word2idx['<PAD>']] * (self.max_len - len(indices))

        return torch.tensor(indices, dtype=torch.long), torch.tensor(label, dtype=torch.long)


# ==================== 模型定义 ====================

class LSTMClassifier(nn.Module):
    """LSTM情感分类器"""

    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
                 n_layers=2, bidirectional=True, dropout=0.5, pad_idx=0):
        super(LSTMClassifier, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)

        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            bidirectional=bidirectional,
            dropout=dropout if n_layers > 1 else 0,
            batch_first=True
        )

        fc_input_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.fc = nn.Linear(fc_input_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        embedded = self.dropout(self.embedding(text))
        output, (hidden, cell) = self.lstm(embedded)

        if self.lstm.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]

        hidden = self.dropout(hidden)
        return self.fc(hidden)


# ==================== 训练和评估 ====================

def train_epoch(model, dataloader, optimizer, criterion, device):
    """训练一个epoch"""
    model.train()
    total_loss = 0
    all_preds, all_labels = [], []

    for texts, labels in tqdm(dataloader, desc="Training"):
        texts, labels = texts.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_loss += loss.item()
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    return total_loss / len(dataloader), accuracy_score(all_labels, all_preds)


def evaluate(model, dataloader, criterion, device):
    """评估模型"""
    model.eval()
    total_loss = 0
    all_preds, all_labels = [], []

    with torch.no_grad():
        for texts, labels in tqdm(dataloader, desc="Evaluating"):
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return total_loss / len(dataloader), accuracy_score(all_labels, all_preds), all_preds, all_labels


def predict(model, text, word2idx, device, max_len=128):
    """单条文本预测"""
    model.eval()

    words = jieba.lcut(text)
    indices = [word2idx.get(word, word2idx['<UNK>']) for word in words]

    if len(indices) > max_len:
        indices = indices[:max_len]
    else:
        indices = indices + [word2idx['<PAD>']] * (max_len - len(indices))

    input_tensor = torch.tensor([indices], dtype=torch.long).to(device)

    with torch.no_grad():
        output = model(input_tensor)
        prob = torch.softmax(output, dim=1)
        pred = torch.argmax(prob, dim=1).item()

    return pred, prob[0].cpu().numpy()


# ==================== 主函数 ====================

def main():
    # 设置随机种子
    config = Config()
    set_seed(config.seed)

    print(f"Using device: {config.device}")

    # 创建示例数据(实际使用时替换为真实数据)
    sample_data = {
        'text': [
            "这部电影太棒了,剧情精彩,演员演技出色!",
            "非常失望,完全是浪费时间,不推荐观看。",
            "服务态度很好,菜品也很美味,下次还会来!",
            "质量太差了,用了一天就坏了,强烈不推荐。",
            "书写得非常好,内容丰富,受益匪浅。",
            "太难用了,界面设计一塌糊涂,卸载了。",
            "性价比很高,物流也很快,满意!",
            "完全是虚假宣传,实物和图片差太多了。",
            "音质很棒,佩戴舒适,非常推荐!",
            "客服态度恶劣,问题一直没解决,差评。",
            "很喜欢这个颜色,穿着也很舒服。",
            "做工粗糙,线头很多,退货了。",
            "老师讲解清晰,课程内容实用,学到很多。",
            "纯粹是骗钱的课程,内容毫无价值。",
            "环境优雅,食物精致,约会好去处。",
            "上菜太慢了,等了一个多小时,体验很差。",
        ] * 50,  # 扩展数据量
        'label': [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0] * 50
    }

    df = pd.DataFrame(sample_data)
    texts = df['text'].tolist()
    labels = df['label'].tolist()

    # 构建词汇表
    print("Building vocabulary...")
    word2idx, idx2word = build_vocab(texts, config.min_freq, config.max_vocab_size)
    vocab_size = len(word2idx)
    print(f"Vocabulary size: {vocab_size}")

    # 划分数据集
    train_texts, test_texts, train_labels, test_labels = train_test_split(
        texts, labels, test_size=0.2, random_state=config.seed, stratify=labels
    )

    # 创建数据集和数据加载器
    train_dataset = SentimentDataset(train_texts, train_labels, word2idx, config.max_len)
    test_dataset = SentimentDataset(test_texts, test_labels, word2idx, config.max_len)

    train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=config.batch_size, shuffle=False)

    # 创建模型
    model = LSTMClassifier(
        vocab_size=vocab_size,
        embedding_dim=config.embedding_dim,
        hidden_dim=config.hidden_dim,
        output_dim=config.output_dim,
        n_layers=config.n_layers,
        bidirectional=config.bidirectional,
        dropout=config.dropout
    ).to(config.device)

    # 打印模型参数量
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Total parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")

    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=2)

    # 训练模型
    print("\nStarting training...")
    best_val_loss = float('inf')

    for epoch in range(config.epochs):
        print(f"\nEpoch {epoch + 1}/{config.epochs}")
        print("-" * 40)

        train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, config.device)
        val_loss, val_acc, _, _ = evaluate(model, test_loader, criterion, config.device)

        scheduler.step(val_loss)

        print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pt')
            print("✓ Best model saved!")

    # 加载最佳模型并测试
    print("\n" + "=" * 40)
    print("Final Evaluation")
    print("=" * 40)

    model.load_state_dict(torch.load('best_model.pt'))
    _, test_acc, preds, true_labels = evaluate(model, test_loader, criterion, config.device)

    print(f"\nTest Accuracy: {test_acc:.4f}")
    print("\nClassification Report:")
    print(classification_report(true_labels, preds, target_names=['负面', '正面']))

    # 单条预测示例
    print("\n" + "=" * 40)
    print("Prediction Examples")
    print("=" * 40)

    test_sentences = [
        "这个产品真的很好用,强烈推荐!",
        "太糟糕了,再也不会买了。",
        "一般般吧,没有特别好也没有特别差。",
    ]

    for sentence in test_sentences:
        pred, prob = predict(model, sentence, word2idx, config.device)
        sentiment = "正面" if pred == 1 else "负面"
        print(f"\n文本: {sentence}")
        print(f"预测: {sentiment} (置信度: {prob[pred]:.4f})")


if __name__ == "__main__":
    main()

总结与展望

本文总结

本文详细介绍了基于LSTM的中文情感分析实现,涵盖了以下核心内容:

  1. 理论基础:从RNN的局限性出发,深入讲解了LSTM的门控机制和工作原理
  2. 中文预处理:介绍了分词、停用词处理、文本向量化等关键步骤
  3. 模型实现:提供了基础LSTM和带注意力机制LSTM的完整实现
  4. 训练优化:分享了预训练词向量、数据增强、正则化等多种优化技巧
  5. 完整代码:提供了可直接运行的完整示例代码

LSTM的优势与局限

优势:

  • 能够有效捕捉长距离依赖
  • 相比传统RNN,梯度消失问题大大缓解
  • 模型结构成熟,训练稳定
  • 适合中等规模数据集

局限:

  • 序列处理仍然是顺序的,难以并行化
  • 对于超长序列,仍可能丢失信息
  • 参数量相对较大

进阶方向

  1. Transformer架构:基于自注意力机制,支持并行计算,效果更好
  2. 预训练语言模型:BERT、RoBERTa等模型在情感分析任务上表现优异
  3. 多任务学习:同时学习多个相关任务,提升模型泛化能力
  4. 方面级情感分析:更细粒度的情感分析任务
  5. 跨领域迁移:将一个领域学到的知识迁移到其他领域

推荐资源

论文:

  • Hochreiter & Schmidhuber (1997). “Long Short-Term Memory”
  • Graves (2013). “Generating Sequences With Recurrent Neural Networks”

开源数据集:

  • ChnSentiCorp: 中文情感分析数据集
  • Weibo Sentiment: 微博情感数据集
  • Online Shopping Reviews: 电商评论数据集

工具库:

  • PyTorch: https://pytorch.org/
  • Hugging Face Transformers: https://huggingface.co/
  • jieba: https://github.com/fxsjy/jieba

作者注:本文旨在提供一个完整的LSTM中文情感分析实现指南。如有问题或建议,欢迎讨论交流。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐