|
- import sys
- import numpy as np
- from keras.datasets import imdb
-
- def sigmoid(z):
- '''
- z为prev_activation, size为 nl * m
- '''
- return 1 / (1 + np.exp(-z))
-
- def relu(z):
- '''
- z为prev_activation, size为 nl * m
- '''
- return np.maximum(0, z)
-
- def vectorize_sequences(sequences, dimension=10000):
-
- results = np.zeros((len(sequences), dimension))
- for i, sequence in enumerate(sequences):
- results[i, sequence] = 1. # 索引results矩阵中的位置,赋值为1,全部都是从第0行0列开始的
- return results
-
- def load(path="./weigths.npz"):
- weigths = np.load(path, allow_pickle=True)
- return weigths
-
- def save(weigths, path="./weigths.npz"):
- np.savez(path, param_w=weigths["param_w"], param_b=weigths["param_b"])
- ### np.savetxt("./weigths.out", weigths["param_w"], fmt='%s')
-
- class FFNNModel():
- def __init__(self, x, y, batchsize, epoch) -> None:
- n0 = 10000
- n1 = 20
- n2 = 7
- n3 = 5
- n4 = 1
- layers_dims = [n0, n1, n2, n3, n4] # [12288, 20, 7, 5, 1]
- L = len(layers_dims) - 1 # 4层神经网络,不计输入层
-
- ### 初始化权重
- param_w = [i for i in range(L + 1)]
- param_b = [i for i in range(L + 1)]
-
- for l in range(1, L+1):
- if l < L:
- param_w[l] = np.random.randn(layers_dims[l], layers_dims[l - 1]) * np.sqrt(2 / layers_dims[l - 1])
- if l == L:
- param_w[l] = np.random.randn(layers_dims[l], layers_dims[l - 1]) * 0.01
- param_b[l] = np.zeros((layers_dims[l], 1))
-
- self.layers_dims = layers_dims
- self.L = L
-
- ### 初始化权重
- self.param_w = param_w
- self.param_b = param_b
-
- self.x = x
- self.y = y
- self.batchsize = batchsize
- self.epoch = epoch
-
- def load(self, path="./weigths.npz"):
- weigths = np.load(path, allow_pickle=True)
-
- self.param_w = weigths["param_w"]
- self.param_b = weigths["param_b"]
-
- def train(self) -> None:
- for i in range(0, self.epoch):
- cost = self.neural_network()
- print("###### 第{}次训练的损失率: {} \n".format(i + 1, cost))
- save({"param_w": self.param_w, "param_b": self.param_b})
-
- def predict(self, x_test, threshold=0.5):
- X_new = vectorize_sequences(x_test).T
- L = self.L
- param_w = self.param_w
- param_b = self.param_b
-
- activations = [X_new, ] + [i for i in range(L)]
- prev_activations = [i for i in range(L + 1)]
- m = X_new.shape[1]
-
- for l in range(1, L + 1):
- prev_activations[l] = np.dot(param_w[l], activations[l - 1]) + param_b[l]
- if l < L:
- activations[l] = relu(prev_activations[l])
- else:
- activations[l] = sigmoid(prev_activations[l])
- prediction = (activations[L] > threshold).astype("int")
- return prediction
-
- ##### neural network model
- def neural_network(self, learning_rate=0.001, lambd=0):
- totalDataLen = len(self.x)
- loopSize = int(totalDataLen / self.batchsize)
-
- rcost = 1.0
- for i in range(0, loopSize):
- dataStartIndex = i * self.batchsize
- dataEndIndex = (i + 1) * self.batchsize
-
- X = vectorize_sequences(self.x[dataStartIndex:dataEndIndex]).T
- Y = np.asarray(self.y[dataStartIndex:dataEndIndex]).astype('float32')
-
- param_w = self.param_w
- param_b = self.param_b
-
- layers_dims = self.layers_dims
- L = self.L
-
- m = X.shape[1]
-
- activations = [X, ] + [i for i in range(L)]
- prev_activations = [i for i in range(L+1)]
-
- dA = [i for i in range(L+1)]
- dz = [i for i in range(L+1)]
- dw = [i for i in range(L+1)]
- db = [i for i in range(L+1)]
-
- ### forward propagation
- for l in range(1, L+1):
- prev_activations[l] = np.dot(param_w[l], activations[l-1]) + param_b[l]
- if l < L:
- activations[l] = relu(prev_activations[l])
- else:
- activations[l] = sigmoid(prev_activations[l])
-
- ### 交叉熵损失函数
- cross_entropy_cost = -1/m * (np.dot(np.log(activations[L]), Y.T) \
- + np.dot(np.log(1-activations[L]), 1-Y.T))
-
- ### 正则化
- regularization_cost = 0
- for l in range(1, L+1):
- regularization_cost += np.sum(np.square(param_w[l])) * lambd/(2*m)
-
- cost = cross_entropy_cost + regularization_cost
-
- ### initialize backward propagation
- dA[L] = np.divide(1-Y, 1-activations[L]) - np.divide(Y, activations[L])
- assert dA[L].shape == (1, m)
-
- ### 反向传播算法(梯度下降法是通用的优化算法,反向传播法是梯度下降法在深度神经网络上的具体实现方式。)
- ### backward propagation
- for l in reversed(range(1, L+1)):
- if l == L:
- dz[l] = dA[l] * activations[l] * (1-activations[l])
- else:
- dz[l] = dA[l].copy()
- dz[l][prev_activations[l] <= 0] = 0
-
- dw[l] = 1/m * np.dot(dz[l], activations[l-1].T) + param_w[l] * lambd/m
- db[l] = 1/m * np.sum(dz[l], axis=1, keepdims=True)
- dA[l-1] = np.dot(param_w[l].T, dz[l])
-
- assert dz[l].shape == prev_activations[l].shape
- assert dw[l].shape == param_w[l].shape
- assert db[l].shape == param_b[l].shape
- assert dA[l-1].shape == activations[l-1].shape
-
- param_w[l] = param_w[l] - learning_rate * dw[l]
- param_b[l] = param_b[l] - learning_rate * db[l]
-
- if i % 500 == 499:
- print("第{}批数据的损失率: {}".format(i + 1, cost))
- rcost = cost
-
- if i == loopSize - 1:
- rcost = cost
-
- self.param_w = param_w
- self.param_b = param_b
-
- return rcost
-
- (train_data, train_labels), (test_data, test_labels) = imdb.load_data(path="imdb/imdb.npz",num_words=10000)
-
- x_train = train_data[:25000]
- x_test = test_data[:20]
-
- y_train = train_labels[:25000]
- y_test = test_labels[:20]
-
- model = FFNNModel(x_train, y_train, 16, 100)
- ### model.load()
- model.train()
-
- y_pre = model.predict(x_test)
- print(y_test)
- print(y_pre)
-
- y_pre2 = model.predict(x_train[:20])
- print(y_train[:20])
- print(y_pre2)
-
- sys.exit()
|