import numpy as np import pandas as pd import matplotlib.pyplot as plt import torch from torch import nn from torch.autograd import Variable """ Using torch to do time series analysis by LSTM model """ # load data data_csv = pd.read_csv("./lstm_data.csv", usecols=[1]) #plt.plot(data_csv) #plt.show() # data pre-processing data_csv = data_csv.dropna() dataset = data_csv.values dataset = dataset.astype("float32") val_max = np.max(dataset) val_min = np.min(dataset) val_scale = val_max - val_min dataset = (dataset - val_min) / val_scale # generate dataset def create_dataset(dataset, look_back=6): dataX, dataY = [], [] dataset = dataset.tolist() for i in range(len(dataset) - look_back): a = dataset[i:(i+look_back)] dataX.append(a) dataY.append(dataset[i+look_back]) return np.array(dataX), np.array(dataY) look_back = 1 data_X, data_Y = create_dataset(dataset, look_back) # split train/test dataset train_size = int(len(data_X) * 0.7) test_size = len(data_X) - train_size train_X = data_X[:train_size] train_Y = data_Y[:train_size] test_X = data_X[train_size:] test_Y = data_Y[train_size:] # convert data for torch train_X = train_X.reshape(-1, 1, look_back) train_Y = train_Y.reshape(-1, 1, 1) test_X = test_X.reshape(-1, 1, look_back) train_x = torch.from_numpy(train_X).float() train_y = torch.from_numpy(train_Y).float() test_x = torch.from_numpy(test_X).float() # define LSTM model class LSTM_Reg(nn.Module): def __init__(self, input_size, hidden_size, output_size=1, num_layer=2): super(LSTM_Reg, self).__init__() self.rnn = nn.LSTM(input_size, hidden_size, num_layer) self.reg = nn.Linear(hidden_size, output_size) def forward(self, x): x, _ = self.rnn(x) s, b, h = x.shape x = x.view(s*b, h) x = self.reg(x) x = x.view(s, b, -1) return x net = LSTM_Reg(look_back, 4, num_layer=1) criterion = nn.MSELoss() optimizer = torch.optim.Adam(net.parameters(), lr=1e-2) for e in range(1000): var_x = Variable(train_x) var_y = Variable(train_y) # forward out = net(var_x) loss = criterion(out, var_y) # backward optimizer.zero_grad() loss.backward() optimizer.step() # print progress if e % 100 == 0: print("epoch: %5d, loss: %.5f" % (e, loss.data[0])) # do test net = net.eval() data_X = data_X.reshape(-1, 1, look_back) data_X = torch.from_numpy(data_X).float() var_data = Variable(data_X) pred_test = net(var_data) pred_test = pred_test.view(-1).data.numpy() # plot plt.plot(pred_test, 'r', label="Prediction") plt.plot(dataset, 'b', label="Real") plt.legend(loc="best") plt.show()