目录

1 引言

2 卷积神经网络

2.1 核心机制​

2.1.1 局部感知

2.1.2 参数共享

2.1.3 层级特征提取

2.2 关键组件

2.2.1 卷积层

2.2.2 池化层

2.2.3 全连接层

2.2.4 激活函数

3 时间序列预测中的卷积神经网络

3.1 时序卷积网络

3.1.1 一维卷积

3.1.2 因果卷积

3.1.2 膨胀卷积

3.1.3 MICN

4 CNN、TCN、MICN在电力预测中的性能对比

5 总结


1 引言

卷积神经网络(Convolutional Neural Networks,CNN)凭借局部感知、权值共享和层次化特征提取能力,成为计算机视觉与序列建模的核心工具。随着时序预测任务复杂度提升,传统CNN在长程依赖建模中显露出感受野受限问题。时序卷积网络(Temporal Convolution Networks,TCN)通过因果卷积与膨胀卷积改进长程依赖处理能力,而多尺度等距卷积网络(Multi-scale Isometric Convolution Network,MICN)进一步融合局部与全局上下文,实现线性复杂度下的高精度预测,成为长期序列预测的新范式。

2 卷积神经网络

1980年,日本科学家Fukushima K.首次系统性提出了包含局部连接机制、层级特征抽取策略和平移不变性原理的基础架构,通过仿生模拟生物视觉皮层组织架构构建的识别机制,构成了CNN的原型基础。经过最近几年的融合和创新,卷积神经网络得到了进一步的发展。它是在人工神经网络的基础上发展起来的一种典型的深度学习算法,是一种包含卷积计算且具有深度结构的前馈网络。它的前馈性神经网络可以实现神经元之间的相互响应,从而实现复杂的图像处理功能。不同于传统神经网络,CNN首次引入了卷积层和池化层,通过其独特的结构设计,显著提升了处理复杂问题的能力。图2.1是其结构模型。

图2.1 卷积神经网络的结构模型

2.1 核心机制

CNN的核心机制围绕局部感知、参数共享和层级特征提取三大原则展开,这些机制使其在处理网格结构数据(如图像、音频、时序信号)时展现出远超传统神经网络的效率和性能。

2.1.1 局部感知

在人体的视觉系统中,每个神经元仅对视野中的局部区域较为敏感,这个区域被称为“感受野”。卷积神经网络的设计参考了这一点,摒弃了传统神经网络的输入层与隐藏层的全连接,让卷积层的每个神经元只与输入数据的局部区域连接,而非全连接。例如,在处理一张图像时,某个卷积层的神经元可能只关注图像左上角的3×3像素区域,检测其中的边缘特征;而另一个神经元则专门分析图像中央的局部区域,识别纹理模式。这种设计聚焦于局部特征(如边缘、纹理),符合视觉数据的局部相关性规律,并且减少了冗余计算。

图2.2 局部感知

在卷积神经网络中,随着网络层数的加深,每个神经元的感受野会逐渐扩大。如图2.2所示,当所有层的卷积核大小均为3×3且步长为1时,Layer2的每个神经元能够覆盖Layer1中3×3的区域,而Layer3的神经元通过叠加两层卷积操作,其感受野进一步扩展至Layer1上5×5的范围。这种层级结构使得深层神经元能够整合更大范围的输入信息,从而捕捉更复杂的特征。图中绿色区域标注了Layer2神经元的局部视野,黄色区域则展示了Layer3神经元对应的更大范围的输入关联。

2.1.2 参数共享

参数共享是CNN实现高效计算的关键创新。与传统神经网络中每个连接都具有独立权重不同,CNN的卷积核(filter)在整个输入数据上滑动时使用相同的权重参数。例如检测水平边缘的2×2卷积核,无论扫描到图像的哪个位置都使用相同的4个权重值,如图2.3所示:

图2.3 参数共享

权值共享它通过让同一卷积核在图像的所有位置上滑动检测相同特征,不仅显著减少了模型参数,还实现了特征检测的位置不变性——即无论目标特征出现在图像左上角还是右下角,都能被同一组卷积核识别。多个卷积核并行工作,各自专注于提取不同类型的局部特征,这些特征图最终通过全连接层整合,将分布式特征表示转化为高层语义判断。这种设计既保留了局部特征的几何信息,又通过层级抽象实现了端到端的模式识别。

2.1.3 层级特征提取

CNN 通过多层卷积与池化操作的堆叠,实现对数据从低级到高级的层级特征提取。底层卷积层通常学习边缘、纹理、明暗变化等基础局部特征,例如在处理图像时,第一层可能检测水平边缘、垂直边缘或斑点;中层网络则将底层特征组合成更复杂的组件,如图像中的角、轮廓或局部结构;高层网络进一步整合这些组件,形成抽象的语义特征,例如在人脸识别任务中,高层可能学习到“眼睛”“鼻子”等器官特征,最终通过全连接层输出“人脸”的整体判断。这种层级递进的特征提取方式,模拟了人类视觉系统从感知局部细节到理解整体语义的过程,使模型能逐步捕捉数据的本质规律。对于时序信号(如音频、传感器数据),层级特征提取则表现为从原始波形中先提取短时域的频率特征,再组合成旋律、节奏等高级模式,因此CNN也被广泛应用于语音识别、时间序列预测等领域。

2.2 关键组件

卷积神经网络的性能不仅依赖核心机制,还取决于各组件的协同作用。从卷积层到池化层,每个模块都在特征处理中承担特定角色,共同实现高效的特征学习。

2.2.1 卷积层

卷积层是 CNN 实现特征提取的核心层,其核心操作是通过卷积核对输入数据进行滑动卷积计算。卷积核是可学习的参数矩阵,在输入数据上按照设定的步长(stride)滑动,每滑动到一个位置,便与该位置对应的局部区域进行元素相乘再求和,得到一个特征值,这些特征值共同构成输出的特征图(feature map)。

为了提取更丰富的特征,卷积层通常会包含多个不同的卷积核,每个卷积核专注于捕捉一种特定的特征,例如有的卷积核擅长捕捉边缘特征,有的则专注于纹理特征。通过多个卷积核的并行计算,卷积层能够生成多通道的特征图,这些通道相互独立又相互补充,为后续的特征处理提供了丰富的原始素材。

当希望卷积操作后输出特征图的尺寸与输入数据保持一致或接近时,可以通过填充(padding)机制来实现。无填充时,卷积核只能从输入图像的左上角开始滑动,每次严格覆盖输入区域(不超出边界)。边界像素仅被卷积核覆盖一次,中间像素会被卷积核以滑动步幅为1时覆盖多次(例如,3×3卷积核会使中心像素被计算9次)。填充的具体操作是在输入数据的四周添加若干层零值像素,使得卷积核在滑动过程中能够覆盖到输入数据的边缘区域,从而避免因卷积核滑动导致边缘信息被过度忽略。

假设输入图像大小是input x input,卷积核大小是 kernel,步长为stride,补0的圈数为padding,则padding = (kernel - 1)/2。如图2.4所示,对于一个3×3 的卷积核,padding=1,在输入数据的每一边添加1个像素的零填充,扩展为5×5输入矩阵。使得这种策略不仅有助于保持空间分辨率,还能增强模型对图像边界特征的提取能力,在图像分割、超分辨率重建等任务中尤为重要。

图2.4 padding过程示意图

假设输入图像大小是input x input,卷积核大小是kernel,补0的圈数为padding,步长为stride,卷积后输出特征图大小为output x output,output的计算公式如下:

\text{output}=\frac{\text{input}-\text{kernel}+2\times\text{padding}}{\text{stride}}+1\quad

图像高度和宽度的计算公式如下:

\text{High}_{2}=\frac{\text{High}_{1}-\text{kernel}+2\times\text{padding}}{\text{stride}}+1\quad

\text{Width}_{2}=\frac{\text{Width}_{1}-\text{kernel}+2\times\text{padding}}{\text{STD}}+1\quad

若输入图像为 m x n,想要转换为 m x m 或 n x n 的图像,则可以采用裁剪、拼接、填充的方法。

2.2.2 池化层

池化层是深度学习神经网络的重要组成部分,通常位于卷积层之后,用于对特征图进行下采样。该层通过缩减输入数据的空间维度来降低计算量,同时增强模型的泛化能力。池化操作能够保留关键特征,抑制噪声干扰,从而提升网络对平移和形变的鲁棒性。常见的池化方式包括最大池化和平均池化,分别通过选取局部区域的最大值或平均值来实现特征降维,如图2.5所示。

  • 图2.5 最大池化和平均池化

2.2.3 全连接层

全连接层位于网络末端,起到一个“分类器”的作用。该层通过密集连接整合全局信息,连接着上一层的所有节点,用于综合前面神经网络中所提取到的全部特征信息,将它们映射到样本标记空间。

在卷积神经网络中,全连接层的输出值一般是通过Softmax分类器进行分类得到最终的输出。Softmax分类器的主要目的是将输出的每个神经元直接映射到(0-1)区间内的某个实数,并且保证这些映射的实数之和为 1,函数如式2.4所示:

\sigma(z)_{i}=\frac{e^{z_{i}}}{\sum_{j=1}^{K} e^{z_{j}}} \quad (i=1,2,\ldots,K) \quad

其中,z=[z_{1},z_{2},\ldots,z_{K}]表示全连接层的输出向量(K为类别总数);

\sigma(z)_{i}表示第i个类别的预测概率;

指数函数e^{z_{i}}将负值转换为正数,并通过归一化(分母项)放大不同类别间的分数差异。

Softmax的本质是将分类置信度转化为概率分布,使得模型输出可直接解释为各类别可能性,从而支持交叉熵损失函数计算与分类决策。

2.2.4 激活函数

激活函数在卷积神经网络中起着关键作用,通过引入非线性变换模拟生物神经元的激活机制。这种非线性特性使网络能够学习复杂的视觉模式,若仅使用线性激活函数,多层卷积将退化为单层线性变换,无法有效提取图像中的边缘、纹理等高阶特征。非线性激活通过逐层打破线性约束,逐步构建具有语义表征能力的特征空间。在正向传播过程中,这种机制增强了特征的表征能力;同时其可微性确保了反向传播时梯度的有效传递,从而优化网络参数的更新方向。这种非线性建模能力突破了传统线性方法的局限,使深度网络能够学习复杂的特征表示并实现精确的分类决策。图2.5展示了常见的激活函数类型及其特性。

(a)Sigmoid

(b)Tanh

(c)ReLU

(d)LeakyReLU

图2.6 常见激活函数

图2.6展示了四种典型的激活函数:(a)Sigmoid函数,(b)双曲正切函数(Tanh),(c)修正线性单元(ReLU),(d)带泄露修正线性单元(LReLU)。通过对比分析可见,Sigmoid和Tanh函数在输入值较大时会出现梯度饱和现象,其导数趋近于零,这会导致反向传播过程中梯度消失问题。相比之下,ReLU激活函数具有显著优势:首先,其前向传播仅需简单的阈值比较运算,计算复杂度远低于需要指数运算的Sigmoid类函数;其次,在非饱和区域保持恒为1的梯度值,有效缓解了深层网络训练中的梯度消失问题。正是这些特性使ReLU成为当前卷积神经网络中最主流的激活函数选择。

3 时间序列预测中的卷积神经网络

时间序列数据因其显著的时序依赖性和动态演化特征,对传统卷积神经网络提出了特殊挑战。这类数据具有三个核心特性:时间局部性表现为相邻时间点的观测值往往存在强相关性,多尺度模式体现在不同时间尺度下呈现出差异化的周期和趋势特征,而非平稳性则意味着数据的统计特性会随时间发生系统性变化。这些特性共同构成了时间序列分析的复杂性基础。

传统CNN的固定感受野难以适应时间序列数据的核心特性,主要表现在其等距卷积核无法有效捕捉变长周期模式,标准卷积操作缺乏对时序方向性的显式建模,且池化操作可能破坏关键的时间动态信息。这些局限性导致传统CNN在处理多尺度、非平稳的时序数据时,难以兼顾局部细节与全局依赖关系。

3.1 时序卷积网络

时序卷积网络(Temporal Convolutional Network, TCN)是针对时间序列预测任务设计的专用卷积架构,通过因果卷积(Causal Convolution)和膨胀卷积(Dilated Convolution)的协同设计,解决了传统CNN在时序建模中的关键缺陷。其核心思想是通过显式的时间约束和高效的长程依赖捕捉,实现对复杂时序模式的建模。

3.1.1 一维卷积

一维卷积(1D Convolution)是卷积神经网络处理时间序列、文本、音频等一维数据的核心运算方式。其核心思想是通过滑动卷积核在输入序列上进行局部加权求和,提取不同时间尺度下的特征模式。输入数据通常为一维序列x = [x_{1},x_{2},\dots,x_{T}],其中T为序列长度,而卷积核则是一个可学习的权重向量w = [w_{1},w_{2},\dots,w_{T}],K为卷积核大小。计算时,卷积核在输入序列上滑动,每个位置的输出特征y_{t}由局部区域的加权求和得到,公式为:

y_{t}=\sum_{k=1}^{K} w_{k} \cdot x_{t+k-1} + b

这种操作通过局部感知和权值共享机制,高效提取时序特征,同时显著减少参数量。

在实际计算中,一维卷积通过局部感知和权值共享机制高效提取时序特征。局部感知使得每个输出点仅依赖输入序列的局部区域,适用于时间序列的短时相关性分析,如振动信号的瞬时特征检测。权值共享则确保同一卷积核在整个序列上滑动时使用相同的权重,大幅减少参数量,同时保证特征检测的平移不变性,例如在语音识别中,无论音素出现在序列的哪个位置,都能被稳定识别。此外,通过多层堆叠,一维卷积网络能够实现层级特征提取,浅层网络捕捉短时模式(如脉冲或频谱变化),而深层网络则组合这些低级特征,形成更高级的语义表示,如故障趋势或语音音节结构。

一维卷积的计算过程可以通过填充(Padding)和步长(Stride)灵活调整输出特征的长度和分辨率。填充通过在输入序列两端补零来控制输出长度,而步长则决定卷积核滑动的间隔,较大的步长会降低输出序列的分辨率。例如,对于长度为5的输入序列,若使用大小为3的卷积核,无填充且步长为1时,输出长度为3;而若采用步长为2,则输出长度缩短为2。这些策略在具体任务中具有不同的适用性,例如在需要保留时序细节的任务中,通常采用较小的步长和适当的填充,而在计算资源受限的场景下,较大的步长可以降低计算复杂度。

尽管一维卷积在时序数据处理中表现出色,但仍存在一定的局限性。其局部感知机制虽然能有效捕捉短时模式,但对长程依赖关系的建模能力较弱,例如远距离事件之间的因果关系可能无法被标准的一维卷积捕获。此外,池化操作(如MaxPool1d)虽然能降低计算量并增强特征的鲁棒性,但也可能模糊关键的瞬态信号,如故障诊断中的突发性冲击。这些局限性促使研究者提出更高级的时序卷积结构,如时序卷积网络(TCN)和多尺度等距卷积网络(MICN),通过引入因果卷积、膨胀卷积和多尺度特征融合等机制,进一步提升模型对复杂时序模式的建模能力。

3.1.2 因果卷积

因果卷积(Causal Convolution)是 TCN 处理时序数据的基础机制,其核心作用是确保预测的因果性—— 即模型在时刻t的输出仅依赖于时刻t及之前的输入数据,完全排除未来信息的干扰,这与时间序列“过去影响未来”的本质特性高度契合。

因果卷积的核心在于严格限制模型只能利用当前时刻及历史数据进行计算,从而保证时序预测的因果性。具体来说,在时刻t进行卷积运算时,卷积核仅覆盖时间窗口[t-k+1, t]内的输入数据,其中k表示卷积核大小。因果卷积的定义如下:

y_t = \sum_{k=1}^{K} w_k \cdot x_{t-k+1}, \text{where } t-k+1 \geq 1

其中:

K为卷积核大小,

w_k为可学习权重,

x_{t-k+1}表示历史输入。该设计通过两项约束实现因果性:

  1. 时间下标约束:当时(即卷积核试图访问未来时刻),强制= 0。例如,对于K=3的卷积核,计算=2时刻输出时,x0项因不满足2−3+1≥1而被置零。
  2. 左填充策略:实际实现中,通过向输入序列左侧填充K−1个零值,确保输出序列与输入序列长度一致。

在因果卷积中,输出序列的每个位置仅依赖于输入序列中当前位置及之前的位置,具体而言,对于输入序列索引范围从0到input_length—1,输出序列的第i个元素由输入索引0到i决定。为实现此特性并维持输入输出序列长度一致,需在输入张量左侧进行零填充,填充长度通常为kernel_size—1。这种填充确保严格因果性,即输出序列的最后一个元素仅依赖输入序列的最后一个元素,而倒数第二个输出元素则依赖输入倒数第二个元素及其之前元素,以此类推。该机制在时序建模中至关重要,尤其在自回归生成任务中,有效避免未来信息泄漏。

图3.1 因果卷积填充

从上图可以看出,通过2个条目的左零填充,我们可以在遵守因果关系规则的同时实现相同的输出长度。事实上,在没有膨胀的情况下,维持输入长度所需的零填充条目数始终等于kernel_size – 1。

3.1.2 膨胀卷积

膨胀卷积(Dilated Convolution),亦称扩张卷积,作为卷积神经网络(CNN)中一种重要的结构变体,其核心机制在于通过在卷积核采样点间引入可控的间隔(膨胀率)。若在卷积核的每两个元素之间插入d-1个空洞,卷积核的有效大小为:

k^{\prime}=k+(k-1)\times(d-1)

其中,d称为膨胀率(Dilation Rate).当d=1时卷积核为普通的卷积核.

图3.2给出了膨胀率为2时的膨胀卷积示例。该设计显著扩大了特征图的有效感受野,其核心优势在于无需额外增加网络参数量或显著提升计算复杂度,即可实现对更大范围上下文信息的捕获。

图3.2 膨胀卷积

膨胀卷积中,卷积核元素间的间隔具备可调性。通过动态调整该参数,可在不增加计算负载的前提下扩展感受野,这一特性使其在广域上下文建模任务中效能显著,如高分辨率语音识别及图像语义分割等密集预测场景。

标准卷积的操作是对局部区域进行加权求和,而膨胀卷积则在卷积核内的每两个相邻元素之间插入空洞,使得感受野扩大,但计算量和参数量保持不变。

对于标准卷积,输入特征图X \in \mathbb{R}^{H \times W \times C_{in}},卷积核K \in \mathbb{R}^{k \times k \times C_{in} \times C_{out}},输出为:

Y[i,j,c_{\text{out}}] = \sum_{m=0}^{k-1} \sum_{n=0}^{k-1} \sum_{c_{\text{in}}=0}^{C_{\text{in}}-1} X[i+m,j+n,c_{\text{in}}] \cdot K[m,n,c_{\text{in}},c_{\text{out}}]

膨胀卷积的操作在标准卷积的基础上进行修改,通过在卷积核中的每个相邻元素之间插入空洞,使得卷积核的感受野扩大。假设膨胀率为d,那么膨胀卷积的输出为:

Y[i,j,c_{\text{out}}] = \sum_{m=0}^{k-1} \sum_{n=0}^{k-1} \sum_{\begin{subarray}{c} c_{in}=0 \end{subarray}}^{C_{in}-1} X[i+m \cdot d, j+n \cdot d, c_{in}] \cdot K[m,n,c_{in},c_{\text{out}}]

膨胀卷积的核心思想是通过引入膨胀率(dilation rate),使卷积核在计算时能够间隔采样输入特征图上的像素。这种设计在不改变卷积核物理尺寸或输入特征图大小的前提下,显著扩展了卷积核的有效感受野。其关键优势在于,感受野的扩大无需增加模型的参数量和计算复杂度。

图3.2 膨胀解决扩展感受野

预测模型的关键要求之一是“全历史覆盖”,即输出中的每个预测值都应充分捕捉其对应时间点之前的全部历史信息。例如,当kernel_size=3时,传统卷积因感受野受限,仅能覆盖3个时间步,难以满足此要求。膨胀卷积通过多层堆叠结构能够逐步、高效地扩展模型的时间依赖范围。如图3.2所示,采用 kernel_size=3的双层一维卷积架构可将顶层的感受野扩展至5个时间步,显著提升模型对长程时序依赖的建模能力。

由于能在保持参数效率和计算效率的前提下扩展感受野、捕捉更广的上下文信息,膨胀卷积特别适用于需要建模长距离依赖关系的任务。这包括时序预测、图像分割、大尺度图像理解以及等场景。

3.1.3 MICN

时序卷积网络(TCN)在建模长程依赖方面取得了显著进展,但在处理具有复杂多尺度周期性和强非平稳性的时间序列时仍面临局限。传统膨胀卷积层级结构采用固定的膨胀率增长模式,难以自适应捕捉数据中变化显著的周期模式,且深层网络可能因信息稀释而丢失局部精细特征。多尺度等距卷积网络(Multi-scale Isometric Convolution Network,MICN)通过并行化地提取并融合不同时间尺度的上下文特征,在保持线性计算复杂度的同时有效解决了这些问题。

MICN的核心创新在于其等距卷积块设计。该模块采用多分支并行结构,通过差异化的膨胀率设置实现多尺度特征提取:局部精细分支使用小卷积核和低膨胀率捕捉短期细节和瞬态模式;中程依赖分支通过中等膨胀率刻画典型周期特征;全局语义分支则利用大膨胀率捕获长期趋势和结构性偏移。这种并行架构使模型能够同时关注从微观细节到宏观趋势的不同层次信息。

各分支特征通过门控融合机制进行自适应整合,该机制动态学习不同尺度特征的重要性权重,实现有选择的信息强化或抑制。通过层级堆叠多个等距卷积块,MICN能够对原始序列进行由浅入深的特征分析与融合,同时保留关键细节信息。这种设计使 MICN在保持线性计算复杂度的前提下,显著提升了对多周期、非平稳序列的预测精度,成为时间序列预测领域的重要基准模型。

4 CNN、TCN、MICN在电力预测中的性能对比

本实验采用ETT(Electricity Transformer Temperature)数据集评估LSTM模型在电力变压器油温预测中的性能。该数据集包含2016年7月至2018年7月期间电力变压器的运行数据,有四个子数据集ETTh1、ETTh2、ETTm1和ETTm2,分别提供小时级和分钟级两种采样频率。核心任务是利用历史窗口的多变量数据预测未来多步的油温(OT)。选择ETTh1数据集的,其小时级采样频率既能捕捉电力变压器运行的短期波动,又能避免分钟级数据带来的过高计算复杂度该任务对变压器状态监测和预防性维护具有重要应用价值。数据集的解释如下表:

字段名​​

含义​​

单位/说明​​

date

时间戳

记录数据的日期和时间

OT(Oil Temperature)

变压器油温

目标变量,需预测

HUFL(High Useful Load)

高有用负荷

反映变压器负载情况

HULL(High Useless Load)

高无用负荷

与设备效率相关的指标

MUFL(Middle Useful Load)

中有用负荷

中等负载水平

MULL(Middle Useless Load)

中无用负荷

次要负载指标

LUFL(Low Useful Load)

低有用负荷

低负载状态

LULL(Low Useless Load)

低无用负荷

辅助监测参数

表4.1 ETT数据集字段解释

完整代码如下:

CNN_Forecast:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import warnings
from bayes_opt import BayesianOptimization
import os

warnings.filterwarnings('ignore')


os.makedirs('CNN', exist_ok=True)

# 加载数据
data = pd.read_csv('ETT/ETTh1.csv')
data['date'] = pd.to_datetime(data['date'])

# 提取时间特征增强模型输入
data['hour'] = data['date'].dt.hour
data['day_of_week'] = data['date'].dt.dayofweek
data['day_of_month'] = data['date'].dt.day
data['month'] = data['date'].dt.month

# 更新特征列
target = 'OT'
features = [col for col in data.columns if col not in [target, 'date', 'HUFL', 'HULL']]

# 数据预处理
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data[features + [target]])
data[features + [target]] = data_scaled

# 创建时间序列样本
def create_dataset(data, window_size, horizon):
    X, y = [], []
    for i in range(len(data) - window_size - horizon):
        X.append(data[i:i + window_size, :-1])  # 所有特征
        y.append(data[i + window_size:i + window_size + horizon, -1])  # 目标变量
    return np.array(X), np.array(y)

window_size = 168  # 使用7天的历史数据(7 * 24)
horizon = 24  # 预测未来24小时
X, y = create_dataset(data[features + [target]].values, window_size, horizon)

# 数据集划分
train_size = int(0.7 * len(X))
val_size = int(0.2 * len(X))
test_size = len(X) - train_size - val_size

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size + val_size], y[train_size:train_size + val_size]
X_test, y_test = X[train_size + val_size:], y[train_size + val_size:]

# 转换为PyTorch张量
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# 数据加载器
batch_size = 64
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# 改进的CNN模型架构
class EnhancedCNNForecaster(nn.Module):
    def __init__(self, input_dim, output_dim,
                 conv1_out=64, conv2_out=128, conv3_out=256,
                 kernel_size=3, dropout_rate=0.4, fc_units=512):
        super(EnhancedCNNForecaster, self).__init__()

        # 卷积层
        self.conv1 = nn.Conv1d(input_dim, conv1_out, kernel_size=kernel_size, padding='same')
        self.bn1 = nn.BatchNorm1d(conv1_out)

        self.conv2 = nn.Conv1d(conv1_out, conv2_out, kernel_size=kernel_size, padding='same')
        self.bn2 = nn.BatchNorm1d(conv2_out)

        self.conv3 = nn.Conv1d(conv2_out, conv3_out, kernel_size=kernel_size, padding='same')
        self.bn3 = nn.BatchNorm1d(conv3_out)

        # 池化和正则化
        self.pool = nn.MaxPool1d(2)
        self.dropout = nn.Dropout(dropout_rate)

        # 计算展平后的维度
        self.flatten_dim = conv3_out * (window_size // 8)

        # 全连接层
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(self.flatten_dim, fc_units)
        self.fc2 = nn.Linear(fc_units, fc_units // 2)
        self.fc3 = nn.Linear(fc_units // 2, output_dim)

    def forward(self, x):
        x = x.permute(0, 2, 1)  # [batch, features, seq_len]

        # 卷积块1
        x = self.conv1(x)
        x = self.bn1(x)
        x = torch.relu(x)
        x = self.pool(x)
        x = self.dropout(x)

        # 卷积块2
        x = self.conv2(x)
        x = self.bn2(x)
        x = torch.relu(x)
        x = self.pool(x)
        x = self.dropout(x)

        # 卷积块3
        x = self.conv3(x)
        x = self.bn3(x)
        x = torch.relu(x)
        x = self.pool(x)
        x = self.dropout(x)

        # 全连接层
        x = self.flatten(x)
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)

        return x

# 贝叶斯优化超参数搜索
def optimize_hyperparameters():
    def black_box_function(conv1_out, conv2_out, conv3_out, lr, dropout_rate):
        conv1_out = int(conv1_out)
        conv2_out = int(conv2_out)
        conv3_out = int(conv3_out)

        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model = EnhancedCNNForecaster(
            input_dim=len(features),
            output_dim=horizon,
            conv1_out=conv1_out,
            conv2_out=conv2_out,
            conv3_out=conv3_out,
            dropout_rate=dropout_rate
        ).to(device)

        criterion = nn.HuberLoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)

        # 简化训练过程用于快速评估
        model.train()
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

        # 在验证集上评估
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for X_val_batch, y_val_batch in val_loader:
                X_val_batch, y_val_batch = X_val_batch.to(device), y_val_batch.to(device)
                outputs = model(X_val_batch)
                loss = criterion(outputs, y_val_batch)
                val_loss += loss.item()

        return -val_loss

    pbounds = {
        'conv1_out': (32, 128),
        'conv2_out': (64, 256),
        'conv3_out': (128, 512),
        'lr': (1e-5, 1e-2),
        'dropout_rate': (0.2, 0.5)
    }

    optimizer = BayesianOptimization(
        f=black_box_function,
        pbounds=pbounds,
        random_state=1,
        verbose=2
    )

    optimizer.maximize(init_points=5, n_iter=15)
    return optimizer.max['params']

# 优化超参数
best_params = optimize_hyperparameters()
print(f"Optimized Hyperparameters: {best_params}")

# 使用优化后的超参数创建模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = EnhancedCNNForecaster(
    input_dim=len(features),
    output_dim=horizon,
    conv1_out=int(best_params['conv1_out']),
    conv2_out=int(best_params['conv2_out']),
    conv3_out=int(best_params['conv3_out']),
    dropout_rate=best_params['dropout_rate']
).to(device)

criterion = nn.HuberLoss()
optimizer = optim.AdamW(model.parameters(),
                       lr=best_params['lr'],
                       weight_decay=5e-4)

scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.3,
    patience=5,
    verbose=True,
    min_lr=1e-6
)

# 早停机制
class EarlyStopping:
    def __init__(self, patience=10, delta=0):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.delta = delta

    def __call__(self, val_loss):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0

# 训练函数
def train_model(model, train_loader, val_loader, epochs=200):
    early_stopping = EarlyStopping(patience=15, delta=0.001)
    best_val_loss = float('inf')
    train_losses, val_losses = [], []

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item() * X_batch.size(0)

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for X_val_batch, y_val_batch in val_loader:
                X_val_batch, y_val_batch = X_val_batch.to(device), y_val_batch.to(device)
                outputs = model(X_val_batch)
                loss = criterion(outputs, y_val_batch)
                val_loss += loss.item() * X_val_batch.size(0)

        train_loss /= len(train_loader.dataset)
        val_loss /= len(val_loader.dataset)
        train_losses.append(train_loss)
        val_losses.append(val_loss)

        scheduler.step(val_loss)
        early_stopping(val_loss)
        if early_stopping.early_stop:
            print(f"Early stopping at epoch {epoch + 1}")
            break

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'CNN/best_cnn_model.pth')

        print(f'Epoch {epoch + 1}/{epochs}: Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}, LR: {optimizer.param_groups[0]["lr"]:.2e}')

    # 保存训练曲线
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig('CNN/loss_curve.png')
    plt.close()

    print(f'Training complete. Best Validation Loss: {best_val_loss:.6f}')

# 训练模型
train_model(model, train_loader, val_loader, epochs=200)

# 测试模型
model.load_state_dict(torch.load('CNN/best_cnn_model.pth'))
model.eval()
test_loss = 0
predictions, actuals = [], []
with torch.no_grad():
    for X_test_batch, y_test_batch in test_loader:
        X_test_batch, y_test_batch = X_test_batch.to(device), y_test_batch.to(device)
        outputs = model(X_test_batch)
        loss = criterion(outputs, y_test_batch)
        test_loss += loss.item() * X_test_batch.size(0)
        predictions.append(outputs.cpu().numpy())
        actuals.append(y_test_batch.cpu().numpy())

test_loss /= len(test_loader.dataset)
print(f'Test Loss: {test_loss:.6f}')

# 反标准化函数
def inverse_transform(data, scaler, feature_names, target_name):
    dummy = np.zeros((len(data), len(feature_names) + 1))
    target_idx = feature_names.index(target_name) if target_name in feature_names else len(feature_names)
    dummy[:, target_idx] = data
    return scaler.inverse_transform(dummy)[:, target_idx]

# 反标准化预测结果
preds = np.concatenate(predictions)
trues = np.concatenate(actuals)

# 为所有样本反标准化
all_preds_orig = []
all_trues_orig = []
for i in range(len(preds)):
    all_preds_orig.extend(inverse_transform(preds[i], scaler, features, target))
    all_trues_orig.extend(inverse_transform(trues[i], scaler, features, target))

# 转换为numpy数组
all_preds_orig = np.array(all_preds_orig)
all_trues_orig = np.array(all_trues_orig)

# 计算百分比误差
epsilon = 1e-10  # 防止除以零
error_percent = np.abs((all_preds_orig - all_trues_orig) / (all_trues_orig + epsilon)) * 100

# 创建并保存预测结果CSV文件
results_df = pd.DataFrame({
    'Actual': all_trues_orig,
    'Predicted': all_preds_orig,
    'Error(%)': error_percent
})
results_df.to_csv('CNN/prediction_results.csv', index=False)

# 计算评估指标
mae = np.mean(np.abs(all_preds_orig - all_trues_orig))
max_error = np.max(np.abs(all_preds_orig - all_trues_orig))
mape = np.mean(error_percent)

# 保存指标到文件
with open('CNN/test_metrics.txt', 'w') as f:
    f.write(f'Test Loss (Huber): {test_loss:.6f}\n')
    f.write(f'MAE: {mae:.4f}\n')
    f.write(f'Max Error: {max_error:.4f}\n')
    f.write(f'MAPE: {mape:.2f}%\n')

# 可视化结果(使用第一个测试样本)
plt.figure(figsize=(14, 7))
plt.plot(all_trues_orig[:horizon], label='Actual', linewidth=2)
plt.plot(all_preds_orig[:horizon], label='Predicted', linestyle='--', linewidth=1.5)
plt.title('CNN Forecast vs Actual', fontsize=16)
plt.xlabel('Time (hours)', fontsize=14)
plt.ylabel('OT Value', fontsize=14)
plt.legend(fontsize=12)
plt.grid(True, linestyle='--', alpha=0.7)
plt.savefig('CNN/cnn_forecast.png', dpi=300, bbox_inches='tight')
plt.close()

print("All results saved in 'CNN' folder")

TCN_Forecast:

import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import copy
import math
import warnings
warnings.filterwarnings('ignore')

os.makedirs('TCN', exist_ok=True)

# 加载数据
data = pd.read_csv('ETT/ETTh1.csv')
data['date'] = pd.to_datetime(data['date'])
data.set_index('date', inplace=True)

# 创建时间序列特征
data['hour'] = data.index.hour
data['dayofweek'] = data.index.dayofweek
data['dayofyear'] = data.index.dayofyear
data['month'] = data.index.month

# 数据预处理
target = 'OT'
features = ['HUFL', 'HULL', 'MUFL', 'MULL', 'LUFL', 'LULL', 'hour', 'dayofweek', 'dayofyear', 'month']

# 分别标准化特征和目标变量
scaler_features = StandardScaler()
scaler_target = StandardScaler()

data[features] = scaler_features.fit_transform(data[features])
data[[target]] = scaler_target.fit_transform(data[[target]])


# 创建时间序列样本
def create_dataset(data, window_size, horizon):
    X, y = [], []
    for i in range(len(data) - window_size - horizon + 1):
        X.append(data[i:i + window_size][features].values)
        y.append(data[i + window_size:i + window_size + horizon][target].values)
    return np.array(X), np.array(y)


window_size = 168  # 使用7天的历史数据
horizon = 24  # 预测未来24小时
X, y = create_dataset(data, window_size, horizon)

# 数据集划分
train_size = int(0.7 * len(X))
val_size = int(0.2 * len(X))
test_size = len(X) - train_size - val_size

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size + val_size], y[train_size:train_size + val_size]
X_test, y_test = X[train_size + val_size:], y[train_size + val_size:]

# 转换为PyTorch张量
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# 数据加载器
batch_size = 128
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


# 改进的TCN块定义
class EnhancedTemporalBlock(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, dropout=0.3):
        super(EnhancedTemporalBlock, self).__init__()
        padding = (kernel_size - 1) * dilation

        self.conv1 = nn.Conv1d(n_inputs, n_outputs, kernel_size,
                               stride=stride, padding=0, dilation=dilation)
        self.norm1 = nn.LayerNorm(n_outputs)
        self.gelu1 = nn.GELU()
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = nn.Conv1d(n_outputs, n_outputs, kernel_size,
                               stride=stride, padding=0, dilation=dilation)
        self.norm2 = nn.LayerNorm(n_outputs)
        self.gelu2 = nn.GELU()
        self.dropout2 = nn.Dropout(dropout)

        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.upsample = None
        self.relu = nn.ReLU()
        self.padding = padding
        self.init_weights()

    def init_weights(self):
        nn.init.kaiming_normal_(self.conv1.weight, nonlinearity='relu')
        nn.init.kaiming_normal_(self.conv2.weight, nonlinearity='relu')
        if self.downsample is not None:
            nn.init.kaiming_normal_(self.downsample.weight, nonlinearity='relu')

    def forward(self, x):
        # 对称填充
        x_padded = torch.nn.functional.pad(x, (self.padding // 2, self.padding // 2))
        out = self.conv1(x_padded)
        out = out.permute(0, 2, 1)
        out = self.norm1(out)
        out = out.permute(0, 2, 1)
        out = self.gelu1(out)
        out = self.dropout1(out)

        out_padded = torch.nn.functional.pad(out, (self.padding // 2, self.padding // 2))
        out = self.conv2(out_padded)
        out = out.permute(0, 2, 1)
        out = self.norm2(out)
        out = out.permute(0, 2, 1)
        out = self.gelu2(out)
        out = self.dropout2(out)

        res = x if self.downsample is None else self.downsample(x)

        # 确保维度匹配
        diff = out.size(2) - res.size(2)
        if diff > 0:
            res = torch.nn.functional.pad(res, (0, diff))
        elif diff < 0:
            res = res[:, :, :out.size(2)]

        return self.relu(out + res)


# 改进的TCN模型
class EnhancedTCNForecaster(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super(EnhancedTCNForecaster, self).__init__()
        layers = []
        num_levels = len(num_channels)

        # 初始投影层
        self.init_conv = nn.Conv1d(input_size, num_channels[0], 1)
        self.init_norm = nn.BatchNorm1d(num_channels[0])

        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_channels[0] if i == 0 else num_channels[i - 1]
            out_channels = num_channels[i]
            layers += [EnhancedTemporalBlock(in_channels, out_channels, kernel_size,
                                             stride=1, dilation=dilation_size,
                                             dropout=dropout)]

        self.tcn_blocks = nn.Sequential(*layers)

        # 添加注意力机制
        self.attention = nn.Sequential(
            nn.Linear(num_channels[-1], num_channels[-1] // 2),
            nn.ReLU(),
            nn.Linear(num_channels[-1] // 2, 1),
            nn.Softmax(dim=1)
        )

        # 扩展的全连接层
        self.fc = nn.Sequential(
            nn.Linear(num_channels[-1], num_channels[-1] * 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(num_channels[-1] * 2, num_channels[-1]),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(num_channels[-1], output_size)
        )

    def forward(self, x):
        x = x.permute(0, 2, 1)  # [batch, features, seq_len]
        out = self.init_conv(x)
        out = self.init_norm(out)
        out = self.tcn_blocks(out)

        # 时间注意力
        out_perm = out.permute(0, 2, 1)  # [batch, seq_len, channels]
        attn_weights = self.attention(out_perm)
        context = torch.sum(attn_weights * out_perm, dim=1)

        output = self.fc(context)
        return output


# 模型参数
input_size = len(features)
output_size = horizon
num_channels = [64, 48, 32]  # 三层结构,递减通道数
kernel_size = 5
dropout = 0.3

# 模型初始化
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = EnhancedTCNForecaster(input_size, output_size, num_channels, kernel_size, dropout).to(device)


# 梯度归一化层
class GradNorm:
    def __init__(self, alpha=0.5):
        self.alpha = alpha
        self.grad_norm = None

    def step(self, parameters):
        if self.grad_norm is None:
            total_norm = 0
            for p in parameters:
                if p.grad is not None:
                    param_norm = p.grad.data.norm(2)
                    total_norm += param_norm.item() ** 2
            self.grad_norm = total_norm ** 0.5
        else:
            total_norm = 0
            for p in parameters:
                if p.grad is not None:
                    param_norm = p.grad.data.norm(2)
                    total_norm += param_norm.item() ** 2
            current_norm = total_norm ** 0.5
            # 指数移动平均
            self.grad_norm = self.alpha * self.grad_norm + (1 - self.alpha) * current_norm

            # 梯度归一化
            clip_coef = self.grad_norm / (current_norm + 1e-6)
            for p in parameters:
                if p.grad is not None:
                    p.grad.data.mul_(min(clip_coef, 1.0))


# 损失函数
criterion = nn.HuberLoss(delta=0.5)  # 使用Huber损失对异常值更鲁棒
optimizer = optim.AdamW(model.parameters(), lr=8e-5, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.6, patience=3, verbose=True, min_lr=1e-6
)
grad_norm = GradNorm(alpha=0.8)


# 训练函数
def train_model(model, train_loader, val_loader, epochs=60, patience=5):
    best_val_loss = float('inf')
    best_model_weights = None
    counter = 0

    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        optimizer.zero_grad()

        for batch_idx, (X_batch, y_batch) in enumerate(train_loader):
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            # 自适应数据增强
            if epoch > 5:  # 前5个epoch不添加噪声
                noise_level = max(0.02 - 0.002 * epoch, 0.005)  # 随训练衰减
                noise = torch.randn_like(X_batch) * noise_level
                X_batch = X_batch + noise

            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()

            # 梯度处理
            if batch_idx % 4 == 0:
                # 梯度裁剪 + 归一化
                grad_norm.step(model.parameters())
                optimizer.step()
                optimizer.zero_grad()

            train_loss += loss.item() * X_batch.size(0)

        # 处理余下的梯度
        if len(train_loader) % 4 != 0:
            grad_norm.step(model.parameters())
            optimizer.step()
            optimizer.zero_grad()

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for X_val, y_val in val_loader:
                X_val, y_val = X_val.to(device), y_val.to(device)
                outputs = model(X_val)
                loss = criterion(outputs, y_val)
                val_loss += loss.item() * X_val.size(0)

        train_loss /= len(train_loader.dataset)
        val_loss /= len(val_loader.dataset)

        train_losses.append(train_loss)
        val_losses.append(val_loss)

        print(f'Epoch {epoch + 1}/{epochs}: Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}')

        scheduler.step(val_loss)

        # 保存最佳模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_weights = copy.deepcopy(model.state_dict())
            counter = 0
            torch.save(model.state_dict(), 'TCN/best_tcn_model.pth')
            print(f"Saved new best model with Val Loss: {val_loss:.6f}")
        else:
            counter += 1
            if counter >= patience:
                print(f"Early stopping at epoch {epoch + 1}")
                break

    # 绘制训练曲线
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig('TCN/training_curve.png', dpi=300)
    plt.close()

    model.load_state_dict(best_model_weights)
    print(f'Training complete. Best Validation Loss: {best_val_loss:.6f}')
    return model


# 训练模型
print(f"Training model on {device}...")
model = train_model(model, train_loader, val_loader, epochs=60, patience=5)

# 测试模型
model.load_state_dict(torch.load('TCN/best_tcn_model.pth'))
model.eval()
test_loss = 0
predictions, actuals = [], []
with torch.no_grad():
    for X_test, y_test in test_loader:
        X_test, y_test = X_test.to(device), y_test.to(device)
        outputs = model(X_test)
        loss = criterion(outputs, y_test)
        test_loss += loss.item() * X_test.size(0)
        predictions.append(outputs.cpu().numpy())
        actuals.append(y_test.cpu().numpy())

test_loss /= len(test_loader.dataset)
print(f'Test Loss: {test_loss:.6f}')


# 反标准化函数
def inverse_transform_target(data, scaler):
    return scaler.inverse_transform(data.reshape(-1, 1)).flatten()


# 可视化预测结果
preds = np.concatenate(predictions)
trues = np.concatenate(actuals)

# 反标准化
preds_orig = inverse_transform_target(preds[0], scaler_target)
trues_orig = inverse_transform_target(trues[0], scaler_target)

# 计算相对百分比误差
error = np.abs(preds_orig - trues_orig) / (np.max(trues_orig) - np.min(trues_orig))

plt.figure(figsize=(15, 10))
plt.subplot(3, 1, 1)
plt.plot(trues_orig, label='Actual', color='#2c7bb6', linewidth=2)
plt.plot(preds_orig, label='Predicted', color='#d7191c', linewidth=1.5, alpha=0.9)
plt.title('TCN Forecast vs Actual')
plt.ylabel('OT Value')
plt.legend()

plt.subplot(3, 1, 2)
plt.plot(error * 100, color='#fdae61', linewidth=1.5)
plt.fill_between(range(len(error)), 0, error * 100, color='#fee090', alpha=0.6)
plt.title('Percentage Error')
plt.ylabel('Error (%)')
plt.ylim(0, 20)

plt.subplot(3, 1, 3)
plt.scatter(trues_orig, preds_orig, alpha=0.6, color='#2c7bb6')
plt.plot([min(trues_orig), max(trues_orig)], [min(trues_orig), max(trues_orig)],
         '--', color='#d7191c', linewidth=1.5)
plt.title('Actual vs Predicted')
plt.xlabel('Actual')
plt.ylabel('Predicted')
plt.grid(alpha=0.3)

plt.tight_layout()
plt.savefig('TCN/tcn_forecast.png', dpi=300)
plt.close()

# 保存预测结果
results = pd.DataFrame({
    'Actual': trues_orig,
    'Predicted': preds_orig,
    'Error(%)': error * 100
})
results.to_csv('TCN/prediction_results.csv', index=False)

# 保存测试指标
with open('TCN/test_metrics.txt', 'w') as f:
    f.write(f'Test Loss: {test_loss:.6f}\n')
    f.write(f'Mean Absolute Error: {np.mean(np.abs(preds_orig - trues_orig)):.4f}\n')
    f.write(f'Max Error: {np.max(np.abs(preds_orig - trues_orig)):.4f}\n')
    f.write(f'Mean Percentage Error: {np.mean(error) * 100:.2f}%\n')

print("All results saved in 'TCN' folder")

MICN_Forecast:
 

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset, Dataset
import matplotlib.pyplot as plt
import random
import warnings
import os

# 过滤警告
warnings.filterwarnings('ignore')

os.makedirs('MICN', exist_ok=True)

# 加载数据
data = pd.read_csv('ETT/ETTh1.csv')
data['date'] = pd.to_datetime(data['date'])
data.set_index('date', inplace=True)

# 数据预处理
features = ['HUFL', 'HULL', 'MUFL', 'MULL', 'LUFL', 'LULL', 'OT']
scaler = MinMaxScaler(feature_range=(-1, 1))
data[features] = scaler.fit_transform(data[features])

# 添加时间特征
data['hour'] = data.index.hour
data['day_of_week'] = data.index.dayofweek
data['day_of_month'] = data.index.day
data['month'] = data.index.month

new_features = features + ['hour', 'day_of_week', 'day_of_month', 'month']
target = 'OT'

# 创建数据集
def create_dataset(data, window_size, horizon, augment_prob=0.3):
    X, y = [], []
    total_length = len(data) - window_size - horizon

    for i in range(total_length):
        reverse = random.random() < augment_prob
        start = i if not reverse else len(data) - i - window_size - horizon

        X_seq = data[new_features].iloc[start:start + window_size].values
        y_seq = data[target].iloc[start + window_size:start + window_size + horizon].values

        if reverse:
            X_seq = np.flip(X_seq, axis=0)
            y_seq = np.flip(y_seq, axis=0)

        X.append(X_seq)
        y.append(y_seq)

    return np.array(X), np.array(y)

window_size = 168
horizon = 24
X, y = create_dataset(data, window_size, horizon)

# 数据集划分
train_size = int(0.7 * len(X))
val_size = int(0.2 * len(X))
test_size = len(X) - train_size - val_size

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size + val_size], y[train_size:train_size + val_size]
X_test, y_test = X[train_size + val_size:], y[train_size + val_size:]

# 转换为PyTorch张量
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

# 数据加载器
class CustomDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

batch_size = 128
train_dataset = CustomDataset(X_train, y_train)
val_dataset = CustomDataset(X_val, y_val)
test_dataset = CustomDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# 改进的MICN模型
class EnhancedMICN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        # 多尺度卷积分支
        self.branch1 = nn.Sequential(
            nn.Conv1d(input_dim, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.GELU(),
            nn.Conv1d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.GELU(),
            nn.MaxPool1d(2),
            nn.Dropout(0.3)
        )
        self.branch2 = nn.Sequential(
            nn.Conv1d(input_dim, 128, kernel_size=5, padding=2),
            nn.BatchNorm1d(128),
            nn.GELU(),
            nn.Conv1d(128, 128, kernel_size=5, padding=2),
            nn.BatchNorm1d(128),
            nn.GELU(),
            nn.MaxPool1d(2),
            nn.Dropout(0.3)
        )
        self.branch3 = nn.Sequential(
            nn.Conv1d(input_dim, 128, kernel_size=7, padding=3),
            nn.BatchNorm1d(128),
            nn.GELU(),
            nn.Conv1d(128, 128, kernel_size=7, padding=3),
            nn.BatchNorm1d(128),
            nn.GELU(),
            nn.MaxPool1d(2),
            nn.Dropout(0.3)
        )
        # 特征融合
        self.fusion = nn.Sequential(
            nn.Conv1d(384, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.GELU(),
            nn.Conv1d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.GELU(),
            nn.AdaptiveAvgPool1d(1)
        )
        # 预测头
        self.forecast = nn.Sequential(
            nn.Linear(256, 512),
            nn.GELU(),
            nn.BatchNorm1d(512),
            nn.Dropout(0.4),
            nn.Linear(512, 256),
            nn.GELU(),
            nn.BatchNorm1d(256),
            nn.Linear(256, output_dim)
        )

    def forward(self, x):
        x = x.permute(0, 2, 1)
        b1 = self.branch1(x)
        b2 = self.branch2(x)
        b3 = self.branch3(x)
        combined = torch.cat((b1, b2, b3), dim=1)
        fused = self.fusion(combined).squeeze(-1)
        return self.forecast(fused)

# 初始化模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = EnhancedMICN(input_dim=len(new_features), output_dim=horizon).to(device)
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=8, verbose=True)

# 训练函数
def train_model(model, train_loader, val_loader, epochs=100):
    best_val_loss = float('inf')
    no_improve = 0
    patience = 15

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item()

        # 验证
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for X_val, y_val in val_loader:
                X_val, y_val = X_val.to(device), y_val.to(device)
                outputs = model(X_val)
                val_loss += criterion(outputs, y_val).item()

        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        print(f'Epoch {epoch + 1}/{epochs}: Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}')
        scheduler.step(val_loss)

        # 早停
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'MICN/best_micn_model.pth')
            no_improve = 0
        else:
            no_improve += 1
            if no_improve >= patience:
                print(f'Early stopping at epoch {epoch + 1}')
                break

    print(f'Training complete. Best Validation Loss: {best_val_loss:.6f}')

# 训练模型
train_model(model, train_loader, val_loader)

# 测试模型
model.load_state_dict(torch.load('MICN/best_micn_model.pth'))
model.eval()

test_loss = 0
predictions, actuals = [], []
with torch.no_grad():
    for X_test, y_test in test_loader:
        X_test, y_test = X_test.to(device), y_test.to(device)
        outputs = model(X_test)
        test_loss += criterion(outputs, y_test).item()
        predictions.append(outputs.cpu().numpy())
        actuals.append(y_test.cpu().numpy())

test_loss /= len(test_loader)
print(f'Test Loss (MSE): {test_loss:.6f}')

# 反标准化函数
def inverse_transform(data, scaler, feature_idx):
    temp = np.zeros((data.shape[0] * data.shape[1], len(features)))
    temp[:, feature_idx] = data.ravel()
    temp = scaler.inverse_transform(temp)
    return temp[:, feature_idx].reshape(data.shape[0], data.shape[1])

# 获取反标准化数据
feature_idx = features.index('OT')
preds = np.concatenate(predictions)
trues = np.concatenate(actuals)
preds_orig = inverse_transform(preds, scaler, feature_idx)
trues_orig = inverse_transform(trues, scaler, feature_idx)

# 计算所有指标
mae = np.mean(np.abs(preds_orig - trues_orig))
max_error = np.max(np.abs(preds_orig - trues_orig))
mape = np.mean(np.abs((preds_orig - trues_orig) / (trues_orig + 1e-10))) * 100  # 避免除以零

# 保存指标
with open('MICN/test_metrics.txt', 'w') as f:
    f.write(f'Test Loss (MSE): {test_loss:.6f}\n')
    f.write(f'MAE: {mae:.4f}\n')
    f.write(f'Max Error: {max_error:.4f}\n')
    f.write(f'MAPE: {mape:.2f}%\n')

# 可视化结果
plt.figure(figsize=(16, 10))
plt.subplot(3, 1, 1)
plt.plot(trues_orig[0], label='Actual', color='blue')
plt.plot(preds_orig[0], label='Predicted', color='red', alpha=0.7)
plt.title('MICN Forecast vs Actual')
plt.ylabel('OT Value')
plt.legend()

plt.subplot(3, 1, 2)
error = np.abs(preds_orig[0] - trues_orig[0])
plt.plot(error, color='orange')
plt.fill_between(range(len(error)), 0, error, color='yellow', alpha=0.3)
plt.title('Absolute Error')
plt.ylabel('Error')

plt.subplot(3, 1, 3)
plt.scatter(trues_orig, preds_orig, alpha=0.3)
plt.plot([min(trues_orig.min(), preds_orig.min()), max(trues_orig.max(), preds_orig.max())],
         [min(trues_orig.min(), preds_orig.min()), max(trues_orig.max(), preds_orig.max())],
         'r--')
plt.title('Actual vs Predicted')
plt.xlabel('Actual')
plt.ylabel('Predicted')

plt.tight_layout()
plt.savefig('MICN/micn_results.png')
plt.close()

print("All metrics saved in 'MICN' folder")

三个模型在该ETTh1.csv上的性能对比如下表所示:

模型

测试损失

平均绝对误差(MAE)

最大误差

平均百分比误差(MAPE)

训练时间(epochs)

关键优势

CNN

0.449618

4.2397

5.9916

73.39%

25

最大误差最小,训练效率中等

TCN

0.121378

6.7929

9.4757

142.02%

6

训练速度最快,收敛性能好

MICN

0.004339

1.2459

12.6043

10.73%

68

整体精度最高,平均误差最小

表4.2 模型性能对比

综合三个模型在ETTh1数据集上的性能表现,它们各具优势:CNN在控制极端误差方面表现最佳;TCN的训练收敛速度最快,但预测精度方面存在明显不足;MICN则实现了最优的整体预测准确度,综合性能最佳,但需要更长的训练时间。

模型的训练损失对比如图4.1所示:

图4.1 训练损失对比

从上图可以看出,MICN的损失变化最为平稳,可能源于其多尺度特征融合的优势。相比之下,传统CNN受到局部感受野的限制,损失波动较大。TCN的损失则持续下降反映了膨胀卷积的层次化特征学习。

图4.2 验证损失对比

在验证损失方面,MICN模型表现出最佳的稳定性,其损失值保持在较低水平且波动较小,这表明多尺度结构能有效学习时间序列特征。TCN模型虽然损失持续下降,但整体数值高于MICN,显示其膨胀卷积结构在本任务中存在一定局限。传统CNN模型的损失波动较大且维持在较高水平,反映出局部感受野在建模时序数据时的不足。

图4.3 模型预测误差对比

MICN模型的误差线始终最低,且波动幅度最小,展现出最优异的预测稳定性;TCN模型的误差线波动最为剧烈,整体维持在较高水平;CNN模型虽然初期误差较大,但后期呈现下降趋势。整体而言,MICN模型在预测精度和稳定性方面明显优于其他两种模型,而TCN模型的表现相对最不稳定。这一对比结果直观反映了不同网络架构对预测误差的影响。

图4.4 模型预测结果对比

真实值曲线呈现明显的波动特征,而三种模型的预测结果均未能准确跟踪这种变化。CNN和TCN的预测线几乎重合,表现出相似的平稳趋势,但与真实值存在明显偏离。相比之下,MICN的预测结果虽然同样平稳,但整体更接近真实值的波动范围。这表明当前模型在捕捉时间序列数据的动态变化方面存在不足,特别是对剧烈波动的数据模式的学习能力需要进一步提升。

5 总结

CNN因其结构设计在时间序列预测中体现出强大的特征提取能力,但它的固定感受野限制了长程依赖建模能力。TCN通过因果卷积和膨胀卷积的协同作用,有效扩展了模型的时间感知范围,提升了检测的性能。MICN则在TCN的基础上进一步引入并行多分支结构,融合不同尺度的时序特征,在保持线性计算复杂度的同时显著提高了预测的精度。

在ETTh1数据集上的实验结果进一步表明,针对电力负荷预测这类复杂时序任务,MICN凭借其多尺度并行架构和自适应特征融合机制,在预测精度上显著优于传统CNN和TCN模型,平均绝对误差降低70%以上,验证了多尺度建模对时序预测的重要性。

然而,当前模型仍存在若干待改进之处。首先,所有模型对剧烈波动的预测能力有限,反映出非线性动态建模的不足;其次,TCN虽然训练效率最高,但其预测性能与计算成本不成正比;最后,MICN的优异性能以更长的训练时间为代价,在实际应用中需要权衡精度与效率。这些局限为未来研究指明了方向:开发动态卷积核以适应数据分布变化,设计轻量化多尺度架构以降低计算成本,以及探索物理约束与数据驱动的混合建模方法。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐