%%HTML
body{--vscode-font-family: "Menlo";}'''0_0'''
import importlib
import LIMU
import math
import torch
from torch import nn
from torch.nn import functional as F
%config InlineBackend.figure_format = 'svg'
%matplotlib inline
gpu = torch.device("cuda:0")
gpu = torch.device("mps:0")
'''0_1'''
Jupyter,如果LIMU模块有变化,需要重新加载。
importlib.reload(LIMU)
from LIMU import Chapter05
循环神经网络
Recurrent Neural Network
带有记忆反馈环路的多层感知机,专门用于处理序列数据,如文本、语音、时间序列数据。
$H_t=\mathcal{a}(\mathbf{X}_{t}\mathbf{W}_{xh}+\mathbf{H}_{t-1}\mathbf{W}_{hh}+\mathbf{b}_{h})$
工作原理
包含一个循环,允许信息在网络内部传递。
在每一个时间步$t$:
- 神经网络接收当前的输入数据$x_t$。
- 神经网络接收来自上一个时间步的隐藏状态"Hidden State"$h_{t-1}$,这个隐藏状态就相当于网络的“记忆”。
- 神经网络将$x_t$和$h_{t-1}$结合起来,通过激活函数计算出当前的隐藏状态$h_t$。
- 当前的隐藏状态$h_t$既可以用于计算当前时间步的输出$y_t$,也会被传递到下一个时间步$t+1$。
困惑度
衡量一个语言模型好坏最常用的指标。
一个序列中所有的$m$个词元的交叉熵损失的平均值:
$\mathcal{ce}=-\frac{1}{m}\sum\log{\mathbb{P}(x_{t}|x_{t-1},...,x_{1})}$
其中,$\mathbb{P}()$由语言模型给出,$x_{t}$是在时间步$t$的实际词元。
一般算出来之后,再放到自然底数上$Perplexity=\exp{\mathcal({ce})}$。
直观理解一下:
$Perplexity=10$,模型在每个时间步预测时,其不确定性等价于从10个候选词中随机挑选1个。$Perplexity=100$,模型在每个时间步预测时,其不确定性等价于从100个候选词中随机挑选1个。
独热编码
'''1_1'''
时间步数35,每32个序列组成迷你批次,batch_size=32, num_steps=35。
iterate_dev, vocab_room = Chapter05.load_data_time_machine(32, 35)
看一下独热编码,len(vocab_room)=28,26个英文字母+空格+unk。
print(F.one_hot(torch.tensor([0,2]), len(vocab_room)))
batch_size=2, num_steps=5。
X_2D = torch.arange(10).reshape((2, 5))
独热编码时转置一下,形状(num_steps, batch_size, vocab_size),
使得num_steps在前,符合训练循环神经网络时,是按时间步进行迭代的。
print(F.one_hot(X_2D.T, 28).shape)
RNN的从零开始实现
'''2_1'''
class RNNScratch:
"""From Scratch 从零开始实现的循环神经网络,这是一个父类,普通RNN、GRU、LSTM都要继承自此类。"""
def __init__(self, vocab_size, num_hiddens, device=gpu):
self.vocab_size = vocab_size
self.num_hiddens = num_hiddens
self.device = device
self.variable_list = []
self.setrnn_variable()
def __call__(self, X_2D, state):
X_2D = F.one_hot(X_2D.T, self.vocab_size).type(torch.float32)
return self.rnn_forward(X_2D, state)
def begin_state(self, batch_size):
"""RNN在0时刻没有上一时间步的隐状态,要人为赋予。"""
return (torch.zeros((batch_size, self.num_hiddens), device=self.device), )
def setrnn_variable(self):
"""留给子类实现:调整RNN模型的参数。"""
raise NotImplementedError
def rnn_forward(self, inputs__list, state):
"""留给子类实现:在一个时间步的前向传播。"""
raise NotImplementedError'''2_2'''
class RNNOrdinary(RNNScratch):
"""普通循环神经网络。"""
def setrnn_variable(self):
"""调整RNN模型的参数。"""
# 输入是当前词元的独热编码,
# 模型的任务是推理下一个词元,输出是一个在整个词汇表上的概率分布,
# 因此,输入特征数、输出特征数都等于词汇表大小。
num_inputs = num_outputs_list = self.vocab_size
init_weights = lambda shape: torch.randn(size=shape, device=self.device) * 0.01
# 输入X到隐状态H的权重矩阵。
W_xh = init_weights((num_inputs, self.num_hiddens))
# RNN的关键,比MLP多了下面这层。
# 上一个时间步的隐状态H到当前隐状态H的权重矩阵。
W_hh = init_weights((self.num_hiddens, self.num_hiddens))
# 隐状态的偏置项。
b_h = torch.zeros(self.num_hiddens, device=self.device)
# 隐状态H到输出Q的权重矩阵。
W_hq = init_weights((self.num_hiddens, num_outputs_list))
# 输出层的偏置项。
b_q = torch.zeros(num_outputs_list, device=self.device)
self.variable_list = [W_xh, W_hh, b_h, W_hq, b_q]
for param in self.variable_list:
param.requires_grad_(True)
def rnn_forward(self, inputs__list, state):
"""RNN在一个时间步内计算隐状态和输出。"""
W_xh, W_hh, b_h, W_hq, b_q = self.variable_list
H, = state
outputs_list = []
# inputs__list的形状是(num_steps, batch_size, vocab_size)。
for X in inputs__list:
# X、Y的形状都是(batch_size, vocab_size)。
H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
Y = torch.mm(H, W_hq) + b_q
outputs_list.append(Y)
# outputs_list的形状是(num_steps, batch_size, vocab_size),
# 拼接后变成(num_steps * batch_size, vocab_size)。
# 损失函数希望接收的参数形状也是(num_samples, vocab_size)。
return torch.cat(outputs_list, dim=0), (H,)'''2_3'''
votre_modele = RNNOrdinary(len(vocab_room), 512)
state = votre_modele.begin_state(X_2D.shape[0])
Y_2D, state = votre_modele(X_2D.to(gpu), state)
输出的形状是(num_steps * batch_size, vocab_size),
隐状态的形状保持不变。
print(Y_2D.shape, type(state), len(state), state[0].shape)
'''2_4'''
def inferring_characters(votre_modele, vocab_room, prefix_string, num_infers):
"""推理用户提供的prefix_string后面是什么样的字符。"""
batch_size=1,做个短期记忆。
state = votre_modele.begin_state(1)
outputs_list = [vocab_room[prefix_string[0]]]
获取最新推理出的字符,在词汇表中的索引作为下一步的输入,重塑为(batch_size=1, num_steps=1)。
get_input = lambda: torch.tensor([outputs_list[-1]], device=votre_modele.device).reshape((1, 1))
预热期,warm-up,不断地将隐状态传递到下一个时间步,但是不产生任何输出,此期间模型会自我更新。
for c in prefix_string[1:]:
充分吸收传入字符串中的信息。
_, state = votre_modele(get_input(), state)
outputs_list.append(vocab_room[c])
推理num_infers步。
for _ in range(num_infers):
Y_2D形状(1×1,28),代表了对下一个字符的打分logits。
Y_2D, state = votre_modele(get_input(), state)
.argmax(dim=1),沿着[1]维寻找最大分的索引,形状是(1,),重塑为标量。
outputs_list.append(int(Y_2D.argmax(dim=1).reshape(1)))
return ''.join([vocab_room[c] for c in outputs_list])
votre_modele并没经过训练,模型在乱猜,困惑度很高。
inferring_characters(votre_modele, vocab_room, "time traveller", 10)
梯度裁剪
为梯度的大小设定一个上限,如果梯度超过了这个上限,就将其“裁剪”回这个范围之内。
在训练循环神经网络时,梯度爆炸很常见,它们在时间序列上反复乘以相同的权重矩阵,这种连乘效应很容易导致梯度指数级增长。
若梯度大小$|\mathbf{grad}|$超过$𝛩$,则按比例缩回,$\mathbf{grad}=\mathrm{min}\left(1,\frac{𝛩}{|\mathbf{grad}|}\right)\cdot\mathbf{grad}$。
'''2_5'''
def gradient_clipping(votre_modele, theta):
"""裁剪梯度。"""
if isinstance(votre_modele, nn.Module):
variable_list = [pa for pa in votre_modele.parameters() if pa.requires_grad]
else:
Jupyter.RNNScratch实例。
variable_list = votre_modele.variable_list
将所有参数的梯度拼接成一个长向量,然后计算其L2 norm(L2范数,即模长)。
norm2 = torch.sqrt(torch.stack([torch.sum(vr.grad**2) for vr in variable_list if vr.grad is not None]).sum())
if norm2 > theta:
ratio = theta / norm2
for vr in variable_list:
vr.grad.data.mul_(ratio) if vr.grad is not None else None
训练RNN
'''3_1'''
iterate_dev, vocab_room = Chapter05.load_data_time_machine(32, 35)
votre_modele = RNNOrdinary(len(vocab_room), 512)
策略1,顺序分区。
Chapter05.training_reasoner_1on_time_machine(votre_modele, gpu,
vocab_room, 500, 1.0, iterate_dev)
'''3_2'''
iterate_dev, vocab_room = Chapter05.load_data_time_machine(32, 35)
votre_modele = RNNOrdinary(len(vocab_room), 512)
策略0,随机采样。
Chapter05.training_reasoner_1on_time_machine(votre_modele, gpu,
vocab_room, 500, 1.0, iterate_dev, 0)
RNN的简洁实现
'''4_1'''
batch_size=32, num_steps=35, num_hiddens=512.。
iterate_dev, vocab_room = Chapter05.load_data_time_machine(32, 35)
rnn_layer = nn.RNN(len(vocab_room), 512)
nn.Module实例,手动初始化0时刻隐状态。
state = torch.zeros((1, 32, 512))
X_3D = torch.rand(size=(35, 32, len(vocab_room)))
Y_3D, state = rnn_layer(X_3D, state)
print(Y_3D.shape, state.shape)
'''4_2'''
class RNNSimple(nn.Module):
def __init__(self, rnn_layer, vocab_size):
super().__init__()
self.rnn_layer = rnn_layer
self.vocab_size = vocab_size
self.num_hiddens = rnn_layer.hidden_size
if not rnn_layer.bidirectional:
RNN双向的。
self.direct = 1
self.linear_layer = nn.Linear(self.num_hiddens, vocab_size)
else:
self.direct = 2
self.linear_layer = nn.Linear(self.num_hiddens * 2, vocab_size)