二三事

“みんなみんな大好き!”

0%

基于计算图方法构造一个支持反向传播算法的MLP

关于自动微分机制的数学证明放在文末,首先给出自动微分的程序实现。因为是我笔算进行推导的,可能存在谬误。

Python version: 3.12.4 numpy version: 1.26.4 sklearn version: 1.3.0

计算图定义

计算图(computational graph)是一种被用于pytorchtensorflow中进行自动微分以实现误差的反向传播、进而计算各参数梯度的技术,这使得我们可以方便地使用梯度更新神经网络的参数。其中,pytorch使用动态计算图设计,tensorflow使用静态计算图设计。

我们的实现中,计算图与自动微分系统被“嵌入”在了层的定义。pytorch在源码中定义了计算图基类,通过重载运算符等方法实现计算图的生成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import numpy as np
from sklearn.datasets import make_moons


class Linear:
def __init__(self, inputFeatures, outputFeatures, bias=True):
self.weights = np.random.rand(inputFeatures, outputFeatures)
self.bias = np.random.rand(outputFeatures)

def __call__(self, x):
self.input = x
self.output = x @ self.weights
if self.bias is not False:
self.output += self.bias
return self.output

def paramenters(self):
if self.bias is not False:
return [self.output, self.bias]
return [self.output]

def backward(self, grad_output, learning_rate):
grad_input = grad_output @ self.weights.T
grad_weights = self.input.T @ grad_output
grad_bias = np.sum(grad_output, axis=0) if self.bias is not None else None

self.weights -= learning_rate * grad_weights
if self.bias is not None:
self.bias -= learning_rate * grad_bias

return grad_input


class Sigmoid:
def __call__(self, x):
self.output = 1 / (1 + np.exp(-x))
return self.output

def backward(self, grad_output, learning_rate):
grad_input = grad_output * self.output * (1 - self.output)
return grad_input


class Softmax:
def __call__(self, x):
self.output = np.exp(x - np.max(x, axis=1, keepdims=True))
self.output /= np.sum(self.output, axis=1, keepdims=True)
return self.output

# def backward(self, grad_output, learning_rate):
# grad_input = grad_output.copy()
# batch_size = grad_output.shape[0]

# for i in range(batch_size):
# y = self.output[i][:, None]
# jacobian = np.diag(y) - np.outer(y, y)
# grad_input[i] = jacobian @ grad_output[i]

# return grad_input


class Sequential:
def __init__(self, layers):
self.layers = layers

def __call__(self, x):
for layer in self.layers:
x = layer(x)
self.output = x
return self.output

def predict_proba(self, x):
logits = self(x)
e_x = np.exp(logits - np.max(logits))
return e_x / e_x.sum(axis=0, keepdims=True)

def paramenters(self):
return [p for layer in self.layers for p in layer.paramenters()]

基于计算图的MLP定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class MLP:
def __init__(self):
self.model = Sequential([
Linear(2, 4), Sigmoid(),
Linear(4, 4), Sigmoid(),
Linear(4, 2)
])
self.softmax = Softmax()

def __call__(self, x):
return self.model(x)

def forward(self, x):
return self.model(x)

def backwardAndGradientDescent(self, x, y, learning_rate):
'''
x: input darta vector, like [[0.5, -1.2], [0.7, 0.3], [-0.2, 0.8]]
y: labels vector, like [0, 1, 0]
'''
batch_size = x.shape[0]
logits = self.forward(x)

grad_output = logits.copy()
grad_output[range(batch_size), y] -= 1 # cross entropy gradient
grad_output /= batch_size

for layer in reversed(self.model.layers):
grad_output = layer.backward(grad_output, learning_rate)

def probability(self, x):
'''
x: input darta vector, like [[0.5, -1.2], [0.7, 0.3], [-0.2, 0.8]]
return: probability vector, like [[0.88, 0.12], [0.45, 0.55], [0.31, 0.69]]
'''
batch_size = x.shape[0]
logits = self.forward(x)
return self.softmax(logits)

def classify(self, x):
return np.argmax(self.probability(x), axis=1)

训练过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
X, y = make_moons(n_samples=1000, noise=0.1)
batch_size= 50
max_steps = 50000
learning_rate = 0.05
mlp = MLP()
lossRecord = []

for step in range(max_steps):
indices = np.arange(X.shape[0])
np.random.shuffle(indices)
X, y = X[indices], y[indices]

for i in range(0, X.shape[0], batch_size):
X_batch = X[i:i+batch_size]
y_batch = y[i:i+batch_size]
mlp.backwardAndGradientDescent(X_batch, y_batch, learning_rate)

if step % 10000 == 0:
output_batch = mlp.forward(X_batch)
epsilon = 1e-12
loss = -np.mean(np.log(output_batch[range(batch_size), y_batch] + epsilon))
lossRecord.append(loss)
print(f"Step {step}, Loss: {loss}")

打印:

Step 0, Loss: 0.6577937985199145 Step 10000, Loss: 0.00873273601484354 Step 20000, Loss: 0.00953517332872011 Step 30000, Loss: 0.0031810759662180698 Step 40000, Loss: 2.1824599140328784e-05 Step 50000, Loss: 0.00025276002897095404
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import matplotlib.pyplot as plt

steps = np.arange(0, max_steps, 1000)
plt.rcParams.update({'font.family': 'serif', 'font.serif': ['Times New Roman'], 'axes.titlesize': 14, 'axes.labelsize': 12, 'xtick.labelsize': 10, 'ytick.labelsize': 10, 'lines.linewidth': 2, 'lines.markersize': 5, 'figure.figsize': (8, 6), 'axes.grid': True, 'grid.alpha': 0.5, 'grid.linestyle': '--', 'grid.color': 'gray'})
plt.plot(steps, lossRecord, color='#9b95c9', label='Training Loss')
plt.title('Training Loss Curve', fontsize=16)
plt.xlabel('Steps', fontsize=12)
plt.ylabel('Loss', fontsize=12)
plt.legend(loc='upper right', fontsize=12)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.tight_layout()
# plt.savefig("training_loss_curve.svg", format="svg")
plt.show()
training loss curve

超平面可视化

可视化的原始代码来自唐亘先生的GitHub项目,在他的视频徒手实现多层感知器--人工智能的创世纪中对此有作演示。在他的可视化代码基础上,我做了若干更改,使数据图漂亮不少——至少,从我审美看的话。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def draw_data(data):
'''
将数据可视化
'''
fig = plt.figure(figsize=(8, 6))
ax = fig.add_subplot(1, 1, 1)
x, y = data
label1 = x[y > 0]
ax.scatter(label1[:, 0], label1[:, 1], marker='o', color='royalblue', label='Class 1', alpha=0.7)
label0 = x[y == 0]
ax.scatter(label0[:, 0], label0[:, 1], marker='^', color='darkorange', label='Class 0', alpha=0.7)
ax.set_title('Data Points Visualization', fontsize=16)
ax.set_xlabel('Feature 1', fontsize=14)
ax.set_ylabel('Feature 2', fontsize=14)
ax.tick_params(axis='both', which='major', labelsize=12)
ax.legend(loc='upper right', fontsize=12)
ax.grid(True, linestyle='--', linewidth=0.5, alpha=0.5)
return ax

def draw_model(ax, model):
'''
将模型的分离超平面可视化
'''
x1 = np.linspace(ax.get_xlim()[0], ax.get_xlim()[1], 100)
x2 = np.linspace(ax.get_ylim()[0], ax.get_ylim()[1], 100)
x1, x2 = np.meshgrid(x1, x2)
y = model.probability(np.c_[x1.ravel(), x2.ravel()])[:, 1]
y = y.reshape(x1.shape)
contour = ax.contourf(x1, x2, y, levels=[0, 0.5], colors=['#9B95C9'], alpha=0.15)
ax.contour(x1, x2, y, levels=[0.5], colors='#7B68EE', linewidths=2, linestyles='solid')
ax.set_title('Decision Boundary', fontsize=16)
ax.set_xlabel('Feature 1', fontsize=14)
ax.set_ylabel('Feature 2', fontsize=14)
ax.tick_params(axis='both', which='major', labelsize=12)
ax.grid(True, linestyle='--', linewidth=0.5, alpha=0.5)
return ax
1
2
3
4
draw_data([X, y])
plt.tight_layout()
# plt.savefig("data_distribution.svg", format="svg")
plt.show()
data distribution
1
2
3
4
draw_model(draw_data([X, y]), mlp)
plt.tight_layout()
# plt.savefig("hyperplane.svg", format="svg")
plt.show()
hyperplane

自动微分机制的数学证明

在一个MLP中,假设batch_size = 1,以分子布局表示向量微分,记 也可以写成标量形式,即 设中间变量 表示数据经线性变换后、经非线性映射(激活函数)前的内容,有公式 ,与 根据链式法则,当梯度传播至 ,即已知 ,其中 为损失函数,则 写作向量形式,即 所以激活函数在反向传播时,应将 传播至低一层。对于不含参数的激活函数,并不需要进行参数的更新,其任务只是传递梯度 。其中表示Hadamard积,在numpy中对应操作符 *

现在逐个讨论 。对于,有 所以 接着讨论,易见 显然 batch_size大于时, 应取np.sum(grad_output, axis=0)而非grad_output。这一步对其他梯度计算而言已经蕴含在矩阵 / 张量乘法中了,但对偏置需要单独实现。

最后讨论 ,同理 因此 其中通过梯度下降及其变种算法参与参数更新,则传递给低一层的网络或激活函数。如果要更新网络参数,前向传播时各层应保存输入,以便在反向传播时计算梯度。

代码中的反向传播算法实际上正是该推理的程序实现。

gradient descent