【金融】【pytorch】使用深度学习预测期货收盘价涨跌——LSTM模型构建与训练
【金融】【LSTM】使用LSTM预测期货收盘价涨跌——模型构建与训练LSTM创建模型模型训练查看指标LSTM创建模型指标函数参考《如何用keras/tf/pytorch实现TP/TN/FP/FN和accuracy/sensiivity/precision/specificity/f1-score等评价指标(python)》# 二、创建LSTM模型hidden_size = 10output_siz
·
LSTM
创建模型
指标函数参考《如何用keras/tf/pytorch实现TP/TN/FP/FN和accuracy/sensiivity/precision/specificity/f1-score等评价指标(python)》
# 二、创建LSTM模型
hidden_size = 10
output_size = 2
input_size = miData.shape[1]
class RNN(nn.Module):
def __init__(self):
super(RNN,self).__init__() #面向对象中的继承
# TODO: 调整参数
self.lstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size, num_layers = 2) #输入数据2个特征维度,6个隐藏层维度,2个LSTM串联,第二个LSTM接收第一个的计算结果
self.out = nn.Linear(hidden_size, hidden_size) #线性拟合,接收数据的维度为6,输出数据的维度为1
self.out2 = nn.Linear(hidden_size, output_size)
self.s = nn.ReLU()
def forward(self,x):
x1,_ = self.lstm(x)
# output (seq_len, batch, hidden_size * num_directions)
a,b,c = x1.shape
out = self.out(x1.view(-1, hidden_size)) #因为线性层输入的是个二维数据,所以此处应该将lstm输出的三维数据x1调整成二维数据,最后的特征维度不能变
out = self.s(out)
out2 = self.out2(out)
out2 = self.s(out2)
# out1 = out.view(a, b, -1) #因为是循环神经网络,最后的时候要把二维的out调整成三维数据,下一次循环使用
output = out2.view(a, b, -1)
return output
rnn = RNN()
print(rnn)
#参数寻优,计算损失函数
optimizer = torch.optim.Adam(rnn.parameters(),lr = 0.001)
# TODO:损失函数
loss_func = nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=20, verbose=True)
# SR : Segmentation Result
# GT : Ground Truth
def get_accuracy(SR,GT,threshold=0.5):
SR = SR > threshold
GT = GT == torch.max(GT)
corr = torch.sum(SR==GT)
# tensor_size = SR.size(0)*SR.size(1)*SR.size(2)*SR.size(3)
tensor_size = SR.size(0)*SR.size(1)
acc = float(corr)/float(tensor_size)
return acc
def get_recall(SR,GT,threshold=0.5):
# Sensitivity == Recall
SR = SR > threshold
GT = GT == torch.max(GT)
# TP : True Positive
# FN : False Negative
TP = ((SR==1)&(GT==1))
FN = ((SR==0)&(GT==1))
SE = float(torch.sum(TP))/(float(torch.sum(TP+FN)) + 1e-6)
return SE
def get_specificity(SR,GT,threshold=0.5):
SR = SR > threshold
GT = GT == torch.max(GT)
# TN : True Negative
# FP : False Positive
TN = ((SR==0)&(GT==0))
FP = ((SR==1)&(GT==0))
SP = float(torch.sum(TN))/(float(torch.sum(TN+FP)) + 1e-6)
return SP
def get_precision(SR,GT,threshold=0.5):
SR = SR > threshold
GT = GT == torch.max(GT)
# TP : True Positive
# FP : False Positive
TP = ((SR==1)&(GT==1))
FP = ((SR==1)&(GT==0))
PC = float(torch.sum(TP))/(float(torch.sum(TP+FP)) + 1e-6)
return PC
模型训练
loss_history = []
accuracy_his = []
precision_his = []
recall_his = []
specificity_his = []
all_y = torch.tensor([])
all_p = torch.tensor([])
#三、训练模型
for k in range(len(end_ptr)):
if end_ptr[k] >= len(miData):
break
trainX, trainY = create_dataset(miData[train_ptr[k]:test_ptr[k],:], yData[train_ptr[k]:test_ptr[k]], 10)
trainLoaderX, trainLoaderY, validateLoaderX, validateLoaderY = trainSet_split(trainX, trainY)
testLoaderX, testLoaderY = create_Test_dataset(miData[test_ptr[k]-10:end_ptr[k],:], yData[test_ptr[k]-10:end_ptr[k]], 10)
print('\nDataSet No.{} data row {}-{}-{}'.format(k, train_ptr[k], test_ptr[k], end_ptr[k]))
# 训练集和验证集
loss_sum_flag = 10 # 用来判断loss是否下降
fall_cnt = 0
train_len = len(trainLoaderX)
train_loss_his = []
for epoch in range(0, 1000):
loss_sum_item = 0
for i, var_x in enumerate(trainLoaderX, 0):
# var_x = Variable(x_train).type(torch.FloatTensor)
# var_y = Variable(y_train).type(torch.FloatTensor)
var_y = trainLoaderY[i]
out = rnn(var_x)
# loss = loss_func(out[-1], var_y[-1].view(-1))
loss = loss_func(out.view(-1, 2), var_y.view(-1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_sum_item += loss.item()
# if (epoch+1) % 50 == 0:
# print('Train Epoch:{}, step:{}, Loss:{:.5f}'.format(epoch, i, loss.item()))
#loss_history.append(loss.item())
if loss_sum_item < loss_sum_flag:
loss_sum_flag = loss_sum_item
fall_cnt += 1
if fall_cnt % 100 == 0:
print('\nDataSet No.{}, Train Epoch:{}, Avg Loss:{:.10f}'.format(k, epoch, loss_sum_item/train_len))
else:
print('>',end='')
else:
# fall_cnt += 1
print('-',end='')
train_loss_his.append(loss_sum_item)
loss_sum_validate = 0
for i, var in enumerate(validateLoaderX, 0):
var_y = trainLoaderY[i]
out = rnn(var_x)
# loss = loss_func(out[-1], var_y[-1].view(-1))
loss = loss_func(out.view(-1, 2), var_y.view(-1))
loss_sum_validate += loss.item()
# optimizer.zero_grad()
# loss.backward()
# optimizer.step()
# if (epoch+1) % 50==0:
# print('Validate Epoch:{}, step:{}, Loss:{:.5f}'.format(epoch, i, loss.item()))
# print('Validate Epoch:{}, Loss:{:.5f}'.format(epoch, i, loss.item()))
# scheduler.step(loss, epoch=epoch)
# TODO: 不知道这种用法对不对
# scheduler.step(loss_sum_validate)
if (epoch+1) % 200==0:
print('\nValidate Epoch:{}, Loss Avg:{:.5f}'.format(epoch, loss_sum_validate/len(validateLoaderX)))
plt.plot(train_loss_his)
plt.show()
torch.save(obj=rnn.state_dict(), f="main_models/LSTM_k"+str(k)+".pth")
# 测试
print('Test')
test_y = torch.tensor([])
test_p = torch.tensor([])
softm_p = torch.tensor([])
for i, var_x in enumerate(testLoaderX, 0):
var_y = testLoaderY[i]
out = rnn(var_x)
# loss = loss_func(out[-1], var_y[-1].view(-1))
loss = loss_func(out.view(-1, 2), var_y.view(-1))
# 取最后一个数,由于batch_size不为1
test_y = torch.cat((test_y, var_y[-1]), 0)
# test_p = torch.cat((test_p, out[-1]), 1)
# if (i+1) % 20==0:
# print('DataSet No.{}, Test step:{}, Loss:{:.5f}'.format(k, i, loss.item()))
loss_history.append(loss.item())
# ind_y = torch.max(var_y[-1], dim = 1)
ind_p = torch.max(out[-1], dim = 1)
# print(out[-1],end=' ')
# 用于计算ROC
softMax_func = nn.Softmax(dim=1)
out_p = softMax_func(out[-1])
softm_p = torch.cat((softm_p, out_p), 0)
# test_y = torch.cat((test_y, ind_y.indices.view(-1, 1)), 1)
test_p = torch.cat((test_p, ind_p.indices.view(-1, 1)), 0)
all_y = torch.cat((all_y, test_y), 0)
all_p = torch.cat((all_p, softm_p), 0)
print((test_p + test_y*10).view(-1))
Acc = get_accuracy(test_p, test_y)
accuracy_his.append(Acc)
print('------------- DataSet:{}, Accuracy:{:.5f} -------------'.format(k, Acc))
Pc = get_precision(test_p, test_y)
precision_his.append(Pc)
print('------------- DataSet:{}, Precision:{:.5f} -------------'.format(k, Pc))
Recall = get_recall(test_p, test_y)
recall_his.append(Recall)
print('------------- DataSet:{}, Recall:{:.5f} -------------'.format(k, Recall))
Sp = get_specificity(test_p, test_y)
specificity_his.append(Sp)
print('------------- DataSet:{}, Specificity:{:.5f} -------------'.format(k, Sp))
查看指标
print(np.mean(accuracy_his))
print(np.mean(recall_his))
print(np.mean(precision_his))
print(accuracy_his)
print(recall_his)
print(precision_his)
left = test_ptr[0]
right = end_ptr[42] - test_ptr[0]
total_y = torch_all_y[left:right, :]
total_p = torch.max(torch_all_p[left:right, :], dim = 1).indices.view(-1, 1)
Acc = get_accuracy(total_p, total_y)
print('------------- DataSet:total, Accuracy:{:.5f} -------------'.format(Acc))
Pc = get_precision(total_p, total_y)
print('------------- DataSet:total, Precision:{:.5f} -------------'.format(Pc))
Recall = get_recall(total_p, total_y)
print('------------- DataSet:total, Recall:{:.5f} -------------'.format(Recall))
Sp = get_specificity(total_p, total_y)
print('------------- DataSet:total, Specificity:{:.5f} -------------'.format(Sp))
year_accuracy_his = []
year_precision_his = []
year_recall_his = []
year_specificity_his = []
for i in range(int(len(accuracy_his) / 4)):
left = test_ptr[i*4] - test_ptr[0]
right = end_ptr[i*4+3] - test_ptr[0]
item_y = torch_all_y[left:right, :]
item_p = torch.max(torch_all_p[left:right, :], dim = 1).indices.view(-1, 1)
Acc = get_accuracy(item_p, item_y)
year_accuracy_his.append(Acc)
print('------------- DataSet:{}, Accuracy:{:.5f} -------------'.format(i, Acc))
Pc = get_precision(item_p, item_y)
year_precision_his.append(Pc)
print('------------- DataSet:{}, Precision:{:.5f} -------------'.format(i, Pc))
Recall = get_recall(item_p, item_y)
year_recall_his.append(Recall)
print('------------- DataSet:{}, Recall:{:.5f} -------------'.format(i, Recall))
Sp = get_specificity(item_p, item_y)
year_specificity_his.append(Sp)
print('------------- DataSet:{}, Specificity:{:.5f} -------------'.format(i, Sp))
if len(accuracy_his)%4 != 0:
left = test_ptr[len(accuracy_his)-(len(accuracy_his)%4)-1] - test_ptr[0]
right = end_ptr[len(accuracy_his)-1] - test_ptr[0]
item_y = torch_all_y[left:right, :]
item_p = torch.max(torch_all_p[left:right, :], dim = 1).indices.view(-1, 1)
Acc = get_accuracy(item_p, item_y)
year_accuracy_his.append(Acc)
print('------------- DataSet:final, Accuracy:{:.5f} -------------'.format(Acc))
Pc = get_precision(item_p, item_y)
year_precision_his.append(Pc)
print('------------- DataSet:final, Precision:{:.5f} -------------'.format(Pc))
Recall = get_recall(item_p, item_y)
year_recall_his.append(Recall)
print('------------- DataSet:final, Recall:{:.5f} -------------'.format(Recall))
Sp = get_specificity(item_p, item_y)
year_specificity_his.append(Sp)
print('------------- DataSet:final, Specificity:{:.5f} -------------'.format(Sp))
plt.figure(figsize=(20,7))
plt.plot(np.arange(len(year_accuracy_his)), year_accuracy_his, label='year_accuracy')
plt.plot(np.arange(len(year_recall_his)), year_recall_his, label='year_recall')
plt.plot(np.arange(len(year_specificity_his)), year_specificity_his, label='year_specificity')
plt.plot(np.arange(len(year_precision_his)), year_precision_his, label='year_precision')
# plt.grid(True, ls=':', c='r')
plt.axhline(y=0.5, c='r', ls='--', lw=2)
plt.legend();
plt.show()

plt.figure(figsize=(20,7))
plt.plot(np.arange(len(all_p[:1000, 1])), all_p[:1000, 1], label='LSTM_pred')
plt.plot(np.arange(len(all_y[:1000])), all_y[:1000], label='label' )
# plt.title('');
plt.axhline(y=0.5, c='r', ls='--', lw=2)
plt.legend()
plt.show()

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)