时序-异常检测和趋势分析
实现了一个完整的时间序列分析工具,结合了深度学习(LSTM)和统计分析方法,可用于异常检测和趋势比较。
文章目录
前言
实现了一个基于LSTM(长短期记忆网络)的时间序列分析工具,主要用于电流数据的异常检测和趋势比较。
主要功能:使用PyTorch实现的LSTM模型分析时间序列数据,包括异常点检测和多序列趋势一致性比较
提示:以下是本篇文章正文内容,下面案例可供参考
一、核心组件
1. 导入依赖
文件导入了必要的库,包括numpy、pandas、PyTorch等用于数据处理和深度学习的库。
2. LSTMModelPredictor类
这是文件的核心类,提供了初始化、训练和预测功能:
class LSTMModelPredictor:
# 各种方法实现...
3. 内嵌的LSTM模型
通过PyTorch实现的LSTM神经网络模型:
class LSTMModel(nn.Module):
def __init__(self, window_size):
super(LSTMModel, self).__init__()
self.lstm1 = nn.LSTM(input_size=1, hidden_size=64, batch_first=True)
self.dropout = nn.Dropout(0.2)
self.lstm2 = nn.LSTM(input_size=64, hidden_size=32, batch_first=True)
self.dense = nn.Linear(32, 1)
二、主要功能详解
1. 配置管理
从YAML文件加载配置参数,包括窗口大小、阈值系数等:
def load_config(self, config_path):
with open(config_path, 'r') as file:
config_data = yaml.safe_load(file)
self.code_dict = config_data
self.config = config_data.get('current_timing_analysis', {})
# 从配置中读取参数
self.window_size = self.config.get('window_size', 30)
self.threshold_factor = self.config.get('threshold_factor', 3)
self.trend_threshold = self.config.get('trend_threshold', 0.1)
2. 模型训练
使用PyTorch训练LSTM模型:
def fit(self, x, y, **kwargs):
# 转换为PyTorch张量
x_tensor = torch.FloatTensor(x)
y_tensor = torch.FloatTensor(y)
# 创建数据集和数据加载器
dataset = torch.utils.data.TensorDataset(x_tensor, y_tensor)
dataloader = torch.utils.data.DataLoader(...)
# 训练循环
self.model.train()
for epoch in range(epochs):
# 训练代码...
3. 异常检测功能
predict_anomaly方法实现了基于LSTM的异常点检测:
- 数据预处理和标准化
- 训练LSTM模型预测时间序列
- 计算预测误差和阈值
- 根据阈值识别异常点
- 返回异常点索引和分析结果
def predict_anomaly(self, datas, objectId):
# 数据预处理
scaled = self.scaler.fit_transform(np.array([[one] for one in datas]))
x, y = self.create_sequences(scaled)
# 训练模型
self.fit(x, y)
# 预测和计算误差
# ...
# 计算阈值,识别异常点
threshold = mean_error + self.threshold_factor * std_error
anomalies = train_errors > (threshold + tolerance)
4. 趋势比较功能
predict_trend_comparison方法比较多个时间序列的趋势一致性:
1. 统计指标分析
计算皮尔逊相关系数
计算变化方向一致性
2. LSTM模型分析
用第一个数据集训练模型
比较不同数据集的预测结果差异
def predict_trend_comparison(self, datas, objectId):
# 相关性和方向一致性分析
for i, data in enumerate(datas[1:], 1):
correlation = np.corrcoef(datas[0], data)[0, 1]
base_diff = np.diff(datas[0])
current_diff = np.diff(data)
direction_match = np.sum((base_diff > 0) == (current_diff > 0)) / len(base_diff)
# 判断一致性
is_consistent = (correlation > correlation_threshold and
direction_match > direction_threshold)
5. 完整代码
## current_timing_analysis.py
import os
import numpy as np
import pandas as pd
import yaml
import joblib
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
# from tensorflow.keras.models import Sequential, load_model
# from tensorflow.keras.layers import LSTM, Dense, Dropout
import torch
import torch.nn as nn
from loger_config import logger
# 配置日志(可以使用logging模块)
# import logging
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# logger = logging.getLogger(__name__)
# 常量和配置管理
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(BASE_DIR, 'error_code.yaml')
class LSTMModelPredictor:
def __init__(self, config_path=CONFIG_PATH):
# 加载配置
self.load_config(config_path)
self.scaler = MinMaxScaler()
self.model = None
# if self.config.get('load_model', False) and os.path.exists(self.config['model_path']):
# self.load_model_and_scaler()
# else:
# self.build_model()
self.build_model()
logger.info("LSTMModelPredictor初始化完成")
def load_config(self, config_path):
"""加载配置文件"""
try:
with open(config_path, 'r') as file:
config_data = yaml.safe_load(file)
self.code_dict = config_data
# 获取current_timing_analysis键下的配置
self.config = config_data.get('current_timing_analysis', {})
# 从配置中读取参数
self.window_size = self.config.get('window_size', 30)
self.threshold_factor = self.config.get('threshold_factor', 3)
self.trend_threshold = self.config.get('trend_threshold', 0.1)
logger.info("配置加载成功")
except Exception as e:
logger.error(f"加载配置失败: {str(e)}")
# 设置默认配置
self.config = {'window_size': 30, 'threshold_factor': 3, 'trend_threshold': 0.1}
self.window_size = 30
self.threshold_factor = 3
self.trend_threshold = 0.1
def build_model(self):
"""构建LSTM模型 - PyTorch实现"""
try:
# 定义PyTorch模型
class LSTMModel(nn.Module):
def __init__(self, window_size):
super(LSTMModel, self).__init__()
self.lstm1 = nn.LSTM(input_size=1, hidden_size=64, batch_first=True)
self.dropout = nn.Dropout(0.2)
self.lstm2 = nn.LSTM(input_size=64, hidden_size=32, batch_first=True)
self.dense = nn.Linear(32, 1)
def forward(self, x):
# LSTM1层
x, (h_n, c_n) = self.lstm1(x)
# Dropout层
x = self.dropout(x)
# LSTM2层
x, (h_n, c_n) = self.lstm2(x)
# 只获取最后一个时间步的输出
x = h_n[-1, :, :]
# 全连接层
x = self.dense(x)
return x
# 创建模型实例
model = LSTMModel(self.window_size)
self.optimizer = torch.optim.Adam(model.parameters())
self.criterion = nn.MSELoss()
# 如果需要,打印模型结构
if self.config.get('show_summary', True):
print(model)
self.model = model
logger.info("模型构建成功")
except Exception as e:
logger.error(f"构建模型失败: {str(e)}")
raise
def save_model_and_scaler(self):
"""保存模型和数据预处理器"""
try:
# 创建目录(如果不存在)
model_dir = os.path.dirname(self.config['model_path'])
if not os.path.exists(model_dir):
os.makedirs(model_dir)
self.model.save(self.config['model_path'])
joblib.dump(self.scaler, self.config['scaler_path'])
logger.info(f"模型保存到: {self.config['model_path']}")
logger.info(f"数据预处理器保存到: {self.config['scaler_path']}")
except Exception as e:
logger.error(f"保存模型失败: {str(e)}")
def fit(self, x, y, **kwargs):
"""训练模型 - PyTorch实现"""
try:
train_params = self.config.get('train_params', {})
# 合并配置和传入的参数
train_params.update(kwargs)
# 转换为PyTorch张量
x_tensor = torch.FloatTensor(x)
y_tensor = torch.FloatTensor(y)
# 创建数据集和数据加载器
dataset = torch.utils.data.TensorDataset(x_tensor, y_tensor)
dataloader = torch.utils.data.DataLoader(
dataset,
batch_size=train_params.get('batch_size', 64),
shuffle=True
)
# 训练参数
epochs = train_params.get('epochs', 20)
verbose = train_params.get('verbose', 1)
# 训练记录
history = {'loss': [], 'val_loss': []}
# 训练循环
self.model.train()
for epoch in range(epochs):
epoch_loss = 0.0
for batch_x, batch_y in dataloader:
# 梯度清零
self.optimizer.zero_grad()
# 前向传播
outputs = self.model(batch_x)
# 计算损失
loss = self.criterion(outputs, batch_y)
# 反向传播
loss.backward()
# 更新参数
self.optimizer.step()
epoch_loss += loss.item()
# 记录损失
avg_loss = epoch_loss / len(dataloader)
history['loss'].append(avg_loss)
if verbose:
print(f'Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}')
# 如果配置中指定了保存模型,则保存
if self.config.get('save_model', False):
self.save_model_and_scaler()
return history
except Exception as e:
logger.error(f"训练模型失败: {str(e)}")
raise
def predict(self,datas):
ret = []
objectId = datas["objectId"]
name = datas['name']
data_dict = datas["datas"]
if name=="anomaly":
return self.predict_anomaly(data_dict["anomaly"],objectId)
elif name=="trend":
mdata=[]
for i in range(1, 101):
trend_index_str=f"trend_index_{i}"
if trend_index_str in data_dict:
mdata.append(data_dict[trend_index_str])
return self.predict_trend_comparison(mdata,objectId)
def predict_anomaly(self, datas,objectId):
ret=[]
"""检测异常点"""
try:
if len(datas)<100:
logger.error(f"{objectId} 异常检测失败: data len < 100")
return ret
# 数据预处理
scaled = self.scaler.fit_transform(np.array([[one] for one in datas]))
x, y = self.create_sequences(scaled)
# 训练模型
self.fit(x, y)
# 获取预测结果 PyTorch预测
self.model.eval() # 设置为评估模式
with torch.no_grad():
x_tensor = torch.FloatTensor(x)
predictions_tensor = self.model(x_tensor)
predictions = predictions_tensor.numpy()
y_orig = self.scaler.inverse_transform(y.reshape(-1, 1))
# 计算误差和阈值
# train_errors = np.abs(y_orig - predictions)
# threshold = np.mean(train_errors) + self.threshold_factor * np.std(train_errors)
# 计算误差和阈值
train_errors = np.abs(y_orig - predictions)
mean_error = np.mean(train_errors)
std_error = np.std(train_errors)
threshold = mean_error + self.threshold_factor * std_error
# 添加容差处理边界情况
tolerance = 1e-8
anomalies = train_errors > (threshold + tolerance)
# 标记异常
# anomalies = train_errors > threshold
anomaly_indices = [i for i, ano in enumerate(anomalies) if ano[0] == True]
result = {
"anomaly_indices": anomaly_indices, # 异常点的索引列表
"threshold": threshold, # 用于判断异常的阈值
"errors": train_errors.flatten().tolist(), # 预测误差列表
"predictions": predictions.flatten().tolist(), # 模型预测值列表
"original": y_orig.flatten().tolist() # 原始数据值列表
}
code = self.code_dict["AnomalyAnalyse"]["Anomaly"]["code"]
codename=self.code_dict["AnomalyAnalyse"]["Anomaly"]["name"]
logger.info(f"{objectId} 检测到{len(anomaly_indices)}个异常点")
if len(anomaly_indices) >0:
ret.append({"code": code, "name": codename, "value": "true"})
else:
ret.append({"code": code, "name": codename, "value": "false"})
return ret
except Exception as e:
logger.error(f"{objectId} 异常检测失败: {str(e)}")
raise
def predict_trend_comparison(self, datas,objectId):
"""比较多个数据集的趋势一致性"""
ret=[]
try:
if len(datas[0])<100 or len(datas[1])<100:
logger.error(f"{objectId} 趋势比较失败: data len < 100")
return ret
# 增加数据长度一致性检查
base_length = len(datas[0])
for i, data in enumerate(datas[1:], 1):
if len(data) != base_length:
logger.error(
f"{objectId} 趋势比较失败: 数据集长度不一致 (基准数据集: {base_length}, 数据集{i}: {len(data)})")
return ret
# 趋势比较前预处理
is_anomaly = False
results = []
# 1. 使用相关性和归一化后的比较代替原始数据直接比较
for i, data in enumerate(datas[1:], 1):
# 计算皮尔逊相关系数
correlation = np.corrcoef(datas[0], data)[0, 1]
# 计算归一化后的变化方向一致性
base_diff = np.diff(datas[0])
current_diff = np.diff(data)
# 变化方向一致性比较
direction_match = np.sum((base_diff > 0) == (current_diff > 0)) / len(base_diff)
# 多指标综合判断
correlation_threshold = 0.85 # 相关系数阈值
direction_threshold = 0.75 # 方向一致性阈值
is_consistent = (correlation > correlation_threshold and
direction_match > direction_threshold)
logger.info(f"数据集{i}与基准数据集分析: 相关系数={correlation:.4f}, "
f"方向一致性={direction_match:.4f}")
if not is_consistent:
is_anomaly = True
results.append({
"dataset_index": i,
"correlation": float(correlation),
"direction_match": float(direction_match),
"is_consistent": is_consistent
})
# 保留原有的LSTM模型预测比较逻辑
# 用第一个数据集训练模型
scaled = self.scaler.fit_transform(np.array([[one] for one in datas[0]]))
x, y = self.create_sequences(scaled)
self.fit(x, y)
# PyTorch预测
self.model.eval() # 设置为评估模式
with torch.no_grad():
x_tensor = torch.FloatTensor(x)
predictions_tensor = self.model(x_tensor)
predictions_base = predictions_tensor.numpy()
# 依次比较其他数据集
for i, data in enumerate(datas[1:], 1):
scaled_current = self.scaler.transform(np.array([[one] for one in data]))
x_current, y_current = self.create_sequences(scaled_current)
# PyTorch预测
self.model.eval() # 设置为评估模式
with torch.no_grad():
x_tensor = torch.FloatTensor(x_current)
predictions_tensor = self.model(x_tensor)
predictions_current = predictions_tensor.numpy()
# 计算预测结果之间的差异
data_errors = np.abs(predictions_base - predictions_current)
std = np.std(data_errors)
# 保持原有阈值判断
model_threshold = self.trend_threshold
model_consistency = std < model_threshold
logger.info(f"数据集{i}与基准数据集的LSTM预测比较:标准差={std:.4f}, 阈值={model_threshold}")
# 更新结果字典
results[i - 1].update({
"lstm_std": float(std),
"lstm_threshold": model_threshold,
"lstm_consistent": model_consistency
})
# 如果LSTM模型预测结果不一致而且相关性分析也不一致,才认为真正不一致
if not model_consistency and not results[i - 1]["is_consistent"]:
is_anomaly = True
code = self.code_dict["TrendAnalyse"]["Trend"]["code"]
codename=self.code_dict["TrendAnalyse"]["Trend"]["name"]
if is_anomaly:
ret.append({"code": code, "name": codename, "value": "true"})
logger.info(f"{objectId} 趋势比较结果:存在趋势差异")
else:
ret.append({"code": code, "name": codename, "value": "false"})
logger.info(f"{objectId} 趋势比较结果:所有趋势一致")
return ret
except Exception as e:
logger.error(f"{objectId} 趋势比较失败: {str(e)}")
raise
def plt_image(self, anomaly_indices, y):
"""可视化异常检测结果"""
try:
plt.figure(figsize=(16, 6))
plt.plot(range(len(y)), y, label='Original')
plt.scatter(anomaly_indices, y[anomaly_indices],
color='red', s=50, label='Anomaly')
plt.title('LSTM Anomaly Detection')
plt.legend()
# 如果配置指定了保存图表
if self.config.get('save_plot', False):
plt_path = self.config.get('plot_path', 'lstm_anomaly_detection.png')
plt.savefig(plt_path)
logger.info(f"Save chart to: {plt_path}")
plt.show()
except Exception as e:
logger.error(f"Failed to generate chart: {str(e)}")
def create_sequences(self, data):
"""创建时间序列窗口数据"""
x, y = [], []
for i in range(len(data) - self.window_size):
x.append(data[i:i + self.window_size])
y.append(data[i + self.window_size])
return np.array(x), np.array(y)
def load_data(self, file_path, sheet_name=None, column_name='calvalue'):
"""从文件加载数据"""
try:
if sheet_name is not None:
data = pd.read_excel(file_path, sheet_name=sheet_name)
else:
data = pd.read_excel(file_path)
values = data[column_name].values
logger.info(f"成功加载数据,形状: {values.shape}")
return values
except Exception as e:
logger.error(f"加载数据失败: {str(e)}")
raise
def main():
"""主函数示例"""
try:
# 从配置文件创建预测器实例
predictor = LSTMModelPredictor()
# 加载数据
data_path = "/data/PycharmProjects/time_sequence/电流数据.xlsx"
y = predictor.load_data(data_path, sheet_name=0)
y2 = predictor.load_data(data_path, sheet_name=1)
y3 = predictor.load_data(data_path, sheet_name=2)
# 异常检测
anomaly_result = predictor.predict_anomaly(y)
# 趋势比较
trend_result = predictor.predict_trend_comparison([y, y2, y3])
logger.info("分析完成")
except Exception as e:
logger.error(f"程序执行错误: {str(e)}")
if __name__ == '__main__':
main()
## loger_config.py
import logging, os
from logging.handlers import RotatingFileHandler
# 配置日志记录
def configure_logger(log_file):
logFolder = os.path.dirname(log_file)
if os.path.exists(logFolder) == False:
os.mkdir(logFolder)
logger = logging.getLogger(__name__)
# 创建一个文件处理程序,将日志写入文件
# file_handler = logging.FileHandler(log_file)
# file_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler(log_file, maxBytes=50e6, backupCount=50)
file_handler.setLevel(logging.DEBUG)
# 创建一个屏幕日志处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# 创建一个格式化程序,定义日志消息的格式
formatter = logging.Formatter('%(asctime)s-%(threadName)s-%(filename)s:%(lineno)d-%(levelname)s: %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 将处理程序添加到记录器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(logging.DEBUG)
return logger
# 创建一个logger对象
logger = configure_logger("./logs/alg.log")
## error_code.yaml
current_timing_analysis:
# 模型基本参数
window_size: 30
threshold_factor: 3
trend_threshold: 0.1
# 模型持久化
load_model: false
save_model: false
model_path: ""
scaler_path: ""
# 训练参数
train_params:
epochs: 30
batch_size: 64
validation_split: 0.1
verbose: 1
# 可视化设置
show_summary: true
save_plot: false
plot_path: "outputs/lstm_anomaly_detection.png"
6. 方法分析
1. 统计相关性分析方法
皮尔逊相关系数适合趋势比较的原因:
1)数学基础:皮尔逊相关系数专门衡量两个变量之间的线性相关程度,非常适合比较两个时间序列的整体趋势相似性
数值范围明确:结果范围为[-1,1],使判定有明确标准(接近1表示趋势高度相似)
2)对比直观:不受两个数据集绝对值大小影响,只关注变化模式,符合趋势比较的本质需求
2. 变化方向一致性适合趋势比较的原因:
1 )关注点明确:直接比较数据上升下降的方向,这正是趋势的最基本特征
2)不受幅度干扰:仅关注方向变化,不受变化幅度大小影响
3)补充相关性分析:可能存在相关系数较高但关键点位方向不一致的情况,此指标能捕捉这类差异
3. LSTM模型预测比较方法
LSTM模型适合趋势比较的原因:
1)时序特性捕捉:LSTM网络专门设计用于学习时间序列中的长短期依赖关系和复杂模式
2)非线性模式识别:能捕捉到统计方法可能忽略的非线性趋势特征
3)预测能力:通过对未来值的预测能力测试数据间的一致性,如果两个数据集有相似趋势,模型对它们的预测结果也应当相似
4)自适应性:通过训练过程,模型能适应不同类型的趋势模式
4.为什么综合使用多种方法更可靠
将这些方法结合使用有以下优势:
1)多维度评估:从不同角度分析趋势相似性,避免单一方法的局限性
2)互补性:统计方法简单直接,捕捉整体相关性;LSTM方法能识别复杂时序关系
3)降低误判率:采用"且"逻辑(只有两种方法都显示不一致才判定为异常),提高结论可靠性
4)适应不同场景:不同类型的趋势差异可能需要不同方法才能有效检测
总之,这种多方法结合的策略对趋势比较更加全面和稳健,能够处理各种不同类型的趋势变化和差异。
1. 皮尔逊相关系数测量
(1)内在逻辑:
统计学基础:皮尔逊相关系数测量两个变量之间线性关系的强度和方向
数学原理:计算两个数据集协方差除以它们标准差的乘积
取值范围:-1到1之间,其中:
1表示完美正相关(一个上升,另一个也上升)
-1表示完美负相关(一个上升,另一个下降)
0表示无线性相关
(2)判断方法:
大于0.85的相关系数被视为"极强相关"
这意味着两个数据集的变化模式高度一致,略微的波动差异被允许
不要求完美相关(1.0),为实际应用中的噪声留有空间2. 变化方向一致性比较
(1)内在逻辑
概念本质:比较两个时间序列相邻点之间的变化方向是否一致
处理步骤:
使用np.diff()计算相邻点的差值,得到变化序列
判断变化是正值(>0)还是负值/零(≤0),即上升或下降/持平
比较两个序列相同位置的变化方向是否一致:(base_diff > 0) == (current_diff > 0)
计算方向一致的点占总点数的比例
(2)判断标准
方向一致性阈值设为0.75,意味着至少75%的时间点上,两个数据集的变化方向必须一致
这一标准关注的是序列的"走势",而非绝对值的变化量
允许25%的方向不一致,为小波动和噪声提供容忍度3. 两者的互补关系
相关性和方向一致性提供了不同角度的趋势一致性评估:
1).相关性:
关注整体模式的线性相关性
考虑变化量的大小
适合评估长期趋势的相似程度
2).方向一致性:
关注每个时间点的变化方向
只考虑上升/下降,不考虑变化量大小
适合评估走势的一致性
皮尔逊相关系数测量
1. 全面性比较
不仅考虑方向:不仅考虑数据是增加还是下降
考虑变化幅度:同时考虑变化的幅度大小
考虑整体模式:评估整个数据序列的相似程度
2. 与简单方向比较的区别
方向一致性:只关注相邻点之间的变化方向(上升/下降)
皮尔逊相关系数:关注整体数据分布和变化模式的相似性
3. 皮尔逊相关系数的优势
标准化:结果被标准化到[-1,1]范围,便于比较
不受尺度影响:不受数据绝对大小的影响
捕捉非线性关系:能够捕捉到更复杂的相关模式对于时间序列数据,皮尔逊相关系数>0.85意味着:
1.高度相似的变化模式:两个数据集的变化模式非常相似
2.成比例的变化幅度:变化幅度可能不同,但成比例
3.允许整体偏移:允许数据集之间存在整体偏移
4.容忍小波动:允许存在小的随机波动
总结
该文件实现了一个完整的时间序列分析工具,结合了深度学习(LSTM)和统计分析方法,可用于异常检测和趋势比较。其模块化设计和配置化管理使其可灵活应用于不同的时间序列分析场景。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)