|
- # -*- coding: utf-8 -*-
- # ---
- # jupyter:
- # jupytext_format_version: '1.2'
- # kernelspec:
- # display_name: Python 3
- # language: python
- # name: python3
- # language_info:
- # codemirror_mode:
- # name: ipython
- # version: 3
- # file_extension: .py
- # mimetype: text/x-python
- # name: python
- # nbconvert_exporter: python
- # pygments_lexer: ipython3
- # version: 3.5.2
- # ---
-
- # # RNN 用于时间序列的分析
- # 前面我们讲到使用 RNN 做简单的图像分类的问题,但是 RNN 并不擅长此类问题,下面我们讲一讲如何将 RNN 用到时间序列的问题上,因为对于时序数据,后面的数据会用到前面的数据,LSTM 的记忆特性非常适合这种场景。
-
- # 首先我们可以读入数据,这个数据是 10 年飞机月流量,可视化得到下面的效果。
-
- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt
- # %matplotlib inline
-
- data_csv = pd.read_csv('./data.csv', usecols=[1])
-
- plt.plot(data_csv)
-
- # 首先我们进行预处理,将数据中 `na` 的数据去掉,然后将数据标准化到 0 ~ 1 之间。
-
- # 数据预处理
- data_csv = data_csv.dropna()
- dataset = data_csv.values
- dataset = dataset.astype('float32')
- max_value = np.max(dataset)
- min_value = np.min(dataset)
- scalar = max_value - min_value
- dataset = list(map(lambda x: x / scalar, dataset))
-
- # 接着我们进行数据集的创建,我们想通过前面几个月的流量来预测当月的流量,比如我们希望通过前两个月的流量来预测当月的流量,我们可以将前两个月的流量当做输入,当月的流量当做输出。同时我们需要将我们的数据集分为训练集和测试集,通过测试集的效果来测试模型的性能,这里我们简单的将前面几年的数据作为训练集,后面两年的数据作为测试集。
-
- def create_dataset(dataset, look_back=2):
- dataX, dataY = [], []
- for i in range(len(dataset) - look_back):
- a = dataset[i:(i + look_back)]
- dataX.append(a)
- dataY.append(dataset[i + look_back])
- return np.array(dataX), np.array(dataY)
-
- # 创建好输入输出
- data_X, data_Y = create_dataset(dataset)
-
- # 划分训练集和测试集,70% 作为训练集
- train_size = int(len(data_X) * 0.7)
- test_size = len(data_X) - train_size
- train_X = data_X[:train_size]
- train_Y = data_Y[:train_size]
- test_X = data_X[train_size:]
- test_Y = data_Y[train_size:]
-
- train_Y.shape
-
- # 最后,我们需要将数据改变一下形状,因为 RNN 读入的数据维度是 (seq, batch, feature),所以要重新改变一下数据的维度,这里只有一个序列,所以 batch 是 1,而输入的 feature 就是我们希望依据的几个月份,这里我们定的是两个月份,所以 feature 就是 2.
-
- # +
- import torch
-
- train_X = train_X.reshape(-1, 1, 2)
- train_Y = train_Y.reshape(-1, 1, 1)
- test_X = test_X.reshape(-1, 1, 2)
-
- train_x = torch.from_numpy(train_X)
- train_y = torch.from_numpy(train_Y)
- test_x = torch.from_numpy(test_X)
- # -
-
- from torch import nn
- from torch.autograd import Variable
-
- # 这里定义好模型,模型的第一部分是一个两层的 RNN,每一步模型接受两个月的输入作为特征,得到一个输出特征。接着通过一个线性层将 RNN 的输出回归到流量的具体数值,这里我们需要用 `view` 来重新排列,因为 `nn.Linear` 不接受三维的输入,所以我们先将前两维合并在一起,然后经过线性层之后再将其分开,最后输出结果。
-
- # 定义模型
- class lstm_reg(nn.Module):
- def __init__(self, input_size, hidden_size, output_size=1, num_layers=2):
- super(lstm_reg, self).__init__()
-
- self.rnn = nn.LSTM(input_size, hidden_size, num_layers) # rnn
- self.reg = nn.Linear(hidden_size, output_size) # 回归
-
- def forward(self, x):
- x, _ = self.rnn(x) # (seq, batch, hidden)
- s, b, h = x.shape
- x = x.view(s*b, h) # 转换成线性层的输入格式
- x = self.reg(x)
- x = x.view(s, b, -1)
- return x
-
- # +
- net = lstm_reg(2, 4)
-
- criterion = nn.MSELoss()
- optimizer = torch.optim.Adam(net.parameters(), lr=1e-2)
- # -
-
- # 定义好网络结构,输入的维度是 2,因为我们使用两个月的流量作为输入,隐藏层的维度可以任意指定,这里我们选的 4
-
- # 开始训练
- for e in range(1000):
- var_x = Variable(train_x)
- var_y = Variable(train_y)
- # 前向传播
- out = net(var_x)
- loss = criterion(out, var_y)
- # 反向传播
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- if (e + 1) % 100 == 0: # 每 100 次输出结果
- print('Epoch: {}, Loss: {:.5f}'.format(e + 1, loss.data[0]))
-
- # 训练完成之后,我们可以用训练好的模型去预测后面的结果
-
- net = net.eval() # 转换成测试模式
-
- data_X = data_X.reshape(-1, 1, 2)
- data_X = torch.from_numpy(data_X)
- var_data = Variable(data_X)
- pred_test = net(var_data) # 测试集的预测结果
-
- # 改变输出的格式
- pred_test = pred_test.view(-1).data.numpy()
-
- # 画出实际结果和预测的结果
- plt.plot(pred_test, 'r', label='prediction')
- plt.plot(dataset, 'b', label='real')
- plt.legend(loc='best')
-
- # 这里蓝色的是真实的数据集,红色的是预测的结果,我们能够看到,使用 lstm 能够得到比较相近的结果,预测的趋势也与真实的数据集是相同的,因为其能够记忆之前的信息,而单纯的使用线性回归并不能得到较好的结果,从这个例子也说明了 RNN 对于序列有着非常好的性能。
-
- # **小练习:试试改变隐藏状态输出的特征数,看看有没有什么改变,同时试试使用简单的线性回归模型,看看会得到什么样的结果**
|