%%HTML
body{--vscode-font-family: "Menlo";}'''0_0'''
import collections
import matplotlib.pyplot as plt
import os
import random
import re
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, TensorDataset
%config InlineBackend.figure_format = 'svg'
gpu = torch.device("cuda:0")
gpu = torch.device("mps:0")
序列模型
- 马尔可夫假设,假设当前数据点只跟τ个过去的数据点有关。
$\mathscr{prob}(x_t|x_{t-1},...,x_1)=\mathscr{prob}(x_t|x_{t-1},...,x_{t-τ})$
- 潜变量,引入潜变量$h_t=g(x_{t-1},...,x_1)$来表示过去的信息。
$\mathscr{prob}(x_t|x_{t-1},...,x_1)=\mathscr{prob}(x_t|h_t)$
我们用正弦函数加上些噪音来生成序列数据,时刻为$1,2,...,1000.$。
'''1_1'''
moment_list = torch.arange(1, 1001, dtype=torch.float32)
price_list = torch.sin(0.01 * moment_list) + torch.normal(0, 0.2, (1000,))
plt.rcParams['font.family'] = ['Arial Unicode MS']
plt.figure(figsize=(6, 3))
plt.xlabel("时刻")
plt.ylabel("价格")
plt.grid(True)
plt.plot(moment_list, price_list)
plt.show()
'''1_2'''
Tt, tau = 1000, 4
形状(996, 4),996个样本,每个样本4个特征,特征即为当前时刻往前的4个历史价格。
features_2dlist = torch.zeros((Tt - tau, tau))
for j in range(tau):
按列填充,features_2dlist:取price_list[0:996]。
features_2dlist[:, j] = price_list[j:Tt-tau+j]
labels取price_list[4:1000]。
labels_2dlist = price_list[tau:].reshape((-1,1))
如此,features_2dlist0为price_list[0:4],labels_2dlist[0]为price_list[4]。
batch_size, num_dev = 16, 600
取前600个样本作为开发集,后396个样本作为测试集。
会有600/16=37个满批次,最后一个批次剩余600%16=8个样本。
iterate_devset = DataLoader(TensorDataset(
features_2dlist[:num_dev], labels_2dlist[:num_dev]), batch_size, shuffle=True)
'''1_3'''
votre modèle 法语:你的模型。
votre_modele = nn.Sequential(
nn.Linear(4, 10),
nn.ReLU(),
nn.Linear(10, 1))
def init_weights(mo):
泽维尔均匀分布权重初始化。
if isinstance(mo, nn.Linear):
nn.init.xavier_uniform_(mo.weight)
votre_modele.apply(init_weights)
loss_function = nn.MSELoss(reduction='none')
train_optimizer = optim.Adam(votre_modele.parameters(), lr=0.01)
def eval_loss_(your_model, iterate_data, loss_func):
"""评估模型在测给定数据集上的损失。"""
metric = [0.0, 0.0]
for X, y in iterate_data:
o = your_model(X)
SE = loss_func(o, y)
将torch.Tensor转为float,切断计算图,兼容matplotlib绘图。
metric[0] += SE.sum().item()
metric[1] += SE.numel()
return metric[0] / metric[1]
'''1_4'''
train_epochs = 5
训练。
for epoch in range(train_epochs):
for X, y in iterate_devset:
train_optimizer.zero_grad()
SE = loss_function(votre_modele(X), y)
SE.sum().backward()
train_optimizer.step()
print(f"epoch {epoch + 1}, loss {eval_loss_(votre_modele, iterate_devset, loss_function):.4f}!")
'''1_5'''
预测,往前走1步。
切换到评估模式,不计算梯度,节省内存。
votre_modele.eval()
with torch.no_grad():
preds_list = votre_modele(features_2dlist)
plt.rcParams['font.family'] = ['Arial Unicode MS']
plt.figure(figsize=(6, 3))
plt.xlabel("时刻")
plt.ylabel("价格")
plt.grid(True)
plt.plot(moment_list, price_list.detach().numpy(), label="真实")
plt.plot(moment_list[tau:], preds_list.detach().numpy(), label="一步预测", color="orange")
plt.legend()
plt.show()
'''1_6'''
预测,往前走到底(396步)。
preds_list = torch.zeros(Tt)
preds_list[:num_dev+tau] = price_list[:num_dev+tau]
切换到评估模式,不计算梯度,节省内存。
votre_modele.eval()
with torch.no_grad():
for i in range(num_dev+tau, Tt):
取前4个历史价格,预测下1步价格,直到走完。
preds_list[i] = votre_modele(preds_list[i-tau:i].reshape((1, -1)))
plt.rcParams['font.family'] = ['Arial Unicode MS']
plt.figure(figsize=(6, 3))
plt.xlabel("时刻")
plt.ylabel("价格")
plt.grid(True)
plt.plot(moment_list, price_list.detach().numpy(), label="真实")
plt.plot(moment_list[num_dev+tau:], preds_list[num_dev+tau:].detach().numpy(), label="多步预测", color="red")
plt.legend()
plt.show()
误差累积,红色折线可能往上走,也可能往下走,不过最后都趋于平缓。
'''1_7'''
预测,分1、4、16、64步走走看。
steps = 64
形状(933, 68),933个样本,每个样本68个特征,特征即为当前时刻往前的4个历史价格和往后预测的64个价格。
features_2dlist = torch.zeros((Tt-tau-steps+1, tau+steps))
for j in range(tau):
features_2dlist[:, j] = price_list[j:Tt-tau-steps+j+1]
for j in range(tau, tau+steps):
features_2dlist[:, j] = votre_modele(features_2dlist[:, j-tau:j]).reshape(-1)
step_list = [1, 4, 16, 64]
plt.rcParams['font.family'] = ['Arial Unicode MS']
plt.figure(figsize=(6, 3))
plt.xlabel("时刻")
plt.ylabel("价格")
plt.grid(True)
plt.plot(moment_list[tau:Tt-steps+1], features_2dlist[:, tau].detach().numpy(), label="1步")
plt.plot(moment_list[tau+3:Tt-steps+4], features_2dlist[:, tau+3].detach().numpy(), label="4步", color="orange")
plt.plot(moment_list[tau+15:Tt-steps+16], features_2dlist[:, tau+15].detach().numpy(), label="16步", color="magenta")
plt.plot(moment_list[tau+63:Tt-steps+64], features_2dlist[:, tau+63].detach().numpy(), label="64步", color="green")
plt.legend()
plt.show()
文本预处理
我们来读取一本《Time Machine》的英文小说。
'''2_1'''
file_path = os.path.expanduser("~/Public/DATAHUB/time_machine.txt")
with open(file_path, 'r') as txt:
novel_content = txt.readlines()
novel_content是一个一维列表,每个元素是小说的一行文本,包含换行。
去掉空行,所有标点变成空格(保持原来的断句),去掉首尾空格(至多各1个),全转为小写,
并将每行按单词分割成列表,所以novel_context是一个二维列表。
novel_context = [re.sub('1+', ' ', line).strip().lower().split() for line in novel_content if line.strip()]
print(f"小说总行数{len(novel_content)},文本行数{len(novel_context)}。")
将字符串类型的词元映射到从0开始的整数索引中,我们统计词元出现频率,为其分配索引,极少出现的词元将被移除,得到的统计结果称之为语料库"corpus"。
另外有一些保留词元,如:遇到语料库中不存在的词元都映射到“未知词元”"\<unk\>",“填充词元”"\<pad\>"、 “序列开始词元”"\<bos\>"、“序列结束词元”"\<eos\>"。
'''2_2'''
class CorpusVocabRegistry:
def __init__(self, novel_context, lower=1, reserved_tokens=()):
"""
novel_context:文本语料,一般是二维字符串列表,可以是一维字符串列表,也可能为空。
lower:词元频次下界,低于此下界认为是不常见的。
reserved_tokens:用户预定义的词元列表。
python不建议使用可变类型(如列表)作为缺省值,因为它们在函数定义时被创建,并且在后续调用中共享同一个对象。
若reserved_tokens缺省为[],声明两个实例时,两个实例的reserved_tokens是同一个对象。
"""
if novel_context is None: novel_context = []
self._make_corpus(novel_context, lower)
索引->词元@列表,词元->索引@字典。
self.index_to_token = ["
self.token_to_index = {token: index for index, token in enumerate(self.index_to_token)}
self._build_vocab()
def __len__(self):
# 底层是C语言结构体,len(self.index_to_token)的时间复杂度是O(1)。
return len(self.index_to_token)
def __getitem__(self, maybe_mdarr):
"""
maybe_mdarr可以是字符串、整数、列表、元组。
还可能是复杂的嵌套列表 [['a', 'b'], ['c']] 。
"""
if isinstance(maybe_mdarr, int):
# 整数,根据索引返回词元。
# 若index越界,则直接抛出异常。
return self.index_to_token[maybe_mdarr]
elif isinstance(maybe_mdarr, str):
# 字符串,根据词元返回索引。
# 若token不在词汇表中,则返回unk的索引0
return self.token_to_index.get(maybe_mdarr, 0)
# 处理列表、元组,甚至复杂的嵌套列表,递归调用__getitem__。
return [self[elem] for elem in maybe_mdarr]
def _make_corpus(self, novel_context, lower):
# 统计novel_context中词元的频率。
if len(novel_context) == 0:
# 传进来的就是空列表,或者没传novel_context参数。
# 虽然直接赋予dict()更节约,但为了保持类型一致,还是赋予collections.Counter()。
self.token_freqs_all = collections.Counter()
elif isinstance(novel_context[0], list):
# 传进来的novel_context是个二维列表,这是最常见的情况。
# 例如:[['the', 'cat', 'sat'], ['the', 'dog', 'barked']]
self.token_freqs_all = collections.Counter()
for line in novel_context:
self.token_freqs_all.update(line)
else:
# 传进来的novel_context是一维列表。
self.token_freqs_all = collections.Counter(novel_context)
# 丢掉频次小于freq_thresh的词元。
self.token_freqs_sift = {token: freq for token, freq in self.token_freqs_all.items() if freq >= lower}
# 按照频次从大到小排序。
self.decreasing_byfreq = sorted(self.token_freqs_sift.items(), key=lambda it: -it[1])
def _build_vocab(self):
idx = len(self.index_to_token)
for token, freq in self.decreasing_byfreq:
if token not in self.token_to_index:
# novel_context中的语料可能和用户预定义的reversed_tokens有重复,去掉重复的词元。
self.index_to_token.append(token)
self.token_to_index[token] = idx
idx += 1'''2_3'''
看一下《time_machine.txt》这本小说的词汇表。
vocab_room = CorpusVocabRegistry(novel_context)
print(f"一元语法词汇表大小:{len(vocab_room)},前10个词元:\n{vocab_room.index_to_token[:10]}。")
不包含用户预定义的词元⬇,出现频次最高的词往往对理解文本的主旨贡献很小,也被称为停用词。
print(vocab_room.token_freqs_all.most_common(10))
print(vocab_room.decreasing_byfreq[:10])
print(f"小说里拿2行文本,看看词元转索引:")
for i in (0, 2):
line = novel_context[i]
print(f"文本列表:{line}\n索引列表:{vocab_room[line]}")
'''2_4'''
对上面代码的封装。
def using_novel_do_tokenize(upper=-1):
由《time_machine.txt》这本小说构建语料库和词汇表。
file_path = os.path.expanduser("~/Public/DATAHUB/time_machine.txt")
with open(file_path, 'r') as txt:
novel_content = txt.readlines()
这回按照字母来统计了,不是按照词来统计了,novel_context仍是一个二维列表,每行按字母分割成列表。
novel_context = [list(re.sub('1+', ' ', line).strip().lower()) for line in novel_content if line.strip()]
print(len(novel_context), len(novel_context[0]))
vocab_room = CorpusVocabRegistry(novel_context)
corpus_bank = [vocab_room[lettre] for line in novel_context for lettre in line]
if upper > 0:
可以切出一块“迷你数据集”,便于调试RNN。
corpus_bank = corpus_bank[:upper]
return corpus_bank, vocab_room
corpus_bank, vocab_room = using_novel_do_tokenize()
print(len(corpus_bank), len(vocab_room))
语料库,看看小说前10个字母的索引。
print(corpus_bank[:10])
词汇表,26个小写字母+空格+unk。
print(vocab_room.index_to_token)
语言模型
'''3_1'''
file_path = os.path.expanduser("~/Public/DATAHUB/time_machine.txt")
with open(file_path, 'r') as txt:
novel_content = txt.readlines()
novel_context仍是一个二维列表,每行按单词分割成列表。
novel_context = [re.sub('1+', ' ', line).strip().lower().split() for line in novel_content if line.strip()]
将二维列表novel_context展平为一维列表。
novel_contest = [word for line in novel_context for word in line]
'''3_2'''
vocab_1gram= CorpusVocabRegistry(novel_contest)
token_tuple, freq_tuple = zip(*vocab_1gram.decreasing_byfreq)
只需表示词元的序号,不需要词元索引。
order_tuple = tuple(range(len(token_tuple)))
词频衰减速率很快,[10]词元的频次还不到[0]词元的频次的1/5。
print(freq_tuple[0], freq_tuple[10])
画个词频对数趋势图,看看词频衰减速率。
plt.rcParams['font.family'] = ['Arial Unicode MS']
plt.figure(figsize=(6, 3))
plt.xlabel("序号")
plt.ylabel("频次")
plt.xscale("log")
plt.yscale("log")
plt.grid(True)
plt.plot(order_tuple, freq_tuple, label="词频对数趋势图")
plt.legend()
plt.show()
'''3_3'''
二元语法,2-gram,bigram。
用一维列表novel_contest构建二元语法的语料库,bigram_contest是一个二维列表,每行是一个二元组。
bigram_contest = list(zip(novel_contest[:-1], novel_contest[1:]))
vocab_2gram = CorpusVocabRegistry(bigram_contest)
print(f"二元语法的词汇表大小:{len(vocab_2gram)},前10个词元:\n{vocab_2gram.decreasing_byfreq[:10]}。")
'''3_4'''
三元语法,3-gram,trigram。
用一维列表novel_contest构建三元语法的语料库,trigram_contest是一个二维列表,每行是一个三元组。
trigram_contest = list(zip(novel_contest[:-2], novel_contest[1:-1], novel_contest[2:]))
vocab_3gram = CorpusVocabRegistry(trigram_contest)
print(f"三元语法的词汇表大小:{len(vocab_3gram)},前10个词元:\n{vocab_3gram.decreasing_byfreq[:10]}。")
一元语法,前10个最频繁的词元里,10个是停用词;二元语法,前10个最频繁的词元里,9个是由二停用词结对(('the', 'time')不是);三元语法,前10个最频繁的词元里,三停用词的结对就很少啦。
'''3_5'''
token_2gram, freq_2gram = zip(*vocab_2gram.decreasing_byfreq)
token_3gram, freq_3gram = zip(*vocab_3gram.decreasing_byfreq)
print(len(token_tuple), len(token_2gram), len(token_3gram))
plt.rcParams['font.family'] = ['Arial Unicode MS']
plt.figure(figsize=(6, 3))
plt.xlabel("序号")
plt.ylabel("频次")
plt.xscale("log")
plt.yscale("log")
plt.grid(True)
3个列表的长度是不一样的,画在一起时,较短的列表会被自动补齐为较长的列表的长度,补齐部分的频次为0。
Matplotlib 的 plt.plot() 或 ax.plot() 只接收到一个参数(Y轴数据)时,自动将下标作为X轴数据。
但是对数缩放时,下标为0的频次被自动丢掉了,不追求严谨。
plt.plot(freq_tuple, label="一元语法")
plt.plot(freq_2gram, label="二元语法", color="orange", ls="--")
plt.plot(freq_3gram, label="三元语法", color="green", ls=":")
plt.legend()
plt.show()
读取超长序列
在训练序列模型,如语言模型、循环神经网络,由于原始文本序列(例如一整本小说)通常非常长,我们无法一次性将整本书喂给模型。因此,我们需要将超长序列切分成合适长度的子序列(长度通常记为num_steps),并将它们成组打包(batch_size)成迷你批量来训练。
“随机采样”"Random Sampling"和“顺序分区”"Sequential Partitioning"就是从长序列中截取和打包这些子序列的两种最核心的数据迭代策略。它们的区别在于相邻的两个mini-batch之间,是否有上下文的连续性。
- 随机采样
思想:完全打乱,互不相关。
在随机采样中,我们首先将超级长序列切分成许许多多长度为num_steps的独立小段。然后,我们像洗牌一样,把这些小段的顺序彻底打乱,最后再从中随机抓取 batch_size个小段拼成一个迷你批次。
工作过程:假设有一段超长文本被切分成了100个子序列。第1个迷你批次可能包含了[15],[88],[3]。 第2个迷你批次可能包含了[42],[9],[71]。
模型训练时的特点:因为前一个迷你批次和后一个迷你批次里的内容在时间上毫无关联,所以每次模型读入一个新的迷你批次时,都必须把隐藏状态清零。这就好比让模型每次都重新开始读一段毫无关系的新句子。
适用场景:你的数据本身就是由许多互相独立的短文本组成的(比如互相没有关联的商品评论、单句的机器翻译语料等)。
- 顺序分区
思想:保留顺序,状态传递。
在顺序分区中,我们不打乱数据,而是把长序列“折叠”起来读。我们保证当前批次里第$i$个位置的子序列,恰好是上一个批次里第$i$个位置的子序列的直接延续。
工作过程: 假设整篇文本有1000个词,我们设batch_size=2。分成2块:1块[0~499]、2块[500~999]。然后模型同时“两线并行”往下读,假设num_steps=10,第1个迷你批次读的是:[0~9]、[500~509]。 第2个迷你批次读的是:[10~19]、[510~519],完美衔接上一个迷你批次。
模型训练时的特点: 因为迷你批次之间是连贯的,模型在读完第1个迷你批次后脑海中产生的“记忆”(隐藏状态),可以直接保留下来,无缝传递给第2个迷你批次继续使用。这使得模型理论上能记住超越num_steps长度的上下文。
注:在代码实现中,由于计算图不能无限延伸,我们在跨迷你批次传递状态时,需要对隐藏状态调用.detach()来截断梯度的反向传播,这种技术叫截断时间反向传播 "Truncated BPTT"。
适用场景:你的数据是一篇长篇大论(比如一整本连续的小说、连续的股票时间序列),你希望模型能学到更长距离的上下文依赖。
'''4_1'''
def sampling_randomly_solong(your_oriseq, batch_size, num_steps):
从随机偏移下标开始对原序列分区,选到的下标之前的词丢掉。
offset = random.randint(0, num_steps - 1)
your_oriseq = your_oriseq[offset:]
可以切分出多少个子序列,为什么要-1呢?对于一个长度为num_steps的特征,需要下1个词作为标签,
为了保证最后一个特征也能取到对应标签,可用长度得-1。
num_subseqs = (len(your_oriseq) - 1) // num_steps
usable_length = num_subseqs * num_steps
长度为num_steps的子序列的起始下标。
比如:[0, 5, 10, 15, 20, 25]。
start_indices = list(range(0, usable_length, num_steps))
打乱,符合随机抽样策略,前一个batch和后一个batch没有上下文关联。
比如:[15, 0, 25, 5, 10, 20]。
random.shuffle(start_indices)
print(offset, start_indices)
从下标j开始的、长度为num_step的子序列。
get_subseq = lambda j: your_oriseq[j:j+num_steps]
所有切分好的子序列每batch_size个组成一个mini-batch,剩余不足batch_size个子序列的部分丢掉。
num_batches = num_subseqs // batch_size
effective_cnt = num_batches * batch_size
for i in range(0, effective_cnt, batch_size):
经过打乱,当前这个mini-batch,所有子序列在your_oriseq里的起始下标组成的列表。
i=0,start_batch=[15,0],取来(your_oriseq[15:20],your_oriseq[0:5])打包成第一个mini-batch。
i=2,start_batch=[25,5],取来(your_oriseq[25:30],your_oriseq[5:10])打包成第二个mini-batch。
start_batch = start_indices[i:i+batch_size]
print(i, start_batch)
X_2D = torch.tensor([get_subseq(j) for j in start_batch])
Y_2D = torch.tensor([get_subseq(j+1) for j in start_batch])
yield X_2D, Y_2D
your_oriseq = list(range(35))
for X_2D, Y_2D in sampling_randomly_solong(your_oriseq, 2, 5):
print(f"{X_2D}\tX\n{Y_2D}\tY")
'''4_2'''
def partiting_orderly_solong(your_oriseq, batch_size, num_steps):
从随机偏移下标开始对原序列分区,选到的下标之前的词丢掉。
offset = random.randint(0, num_steps)
减去offset后,your_oriseq能够被batch_size整除的最大可用词元总数,多余的丢弃。
比如:offset=1, usable_length=32。
usable_length = (len(your_oriseq) - offset - 1) // batch_size * batch_size
print(offset, usable_length)
完整特征矩阵、标签矩阵,长度为usable_length,重塑为(batch_size,usable_length//batch_size)。
相当于把原本一条长长的文本带,等分切成了batch_size截,然后像叠三明治一样把它们叠在一起。
比如:X_Complete=[1,2,...31], Y_Complete=[2,...,32],再重塑成2行16列。
X_Complete = torch.tensor(your_oriseq[offset:offset+usable_length]).reshape(batch_size, -1)
Y_Complete = torch.tensor(your_oriseq[offset+1:offset+usable_length+1]).reshape(batch_size, -1)
能够被num_steps整除的最大有效列数,多余的丢弃。
effective_cnt = usable_length // batch_size // num_steps * num_steps
for i in range(0, effective_cnt, num_steps):
用宽度为num_steps的框去X_Complete跳跃截取列。
i=0,截取[0,1,2,3,4]列,取来[[1,2,3,4,5],[17,18,19,20,21]]打包成第一个mini-batch。
i=5,截取[5,6,7,8,9]列,取来[[6,7,8,9,10],[22,23,24,25,26]]打包成第二个mini-batch。
X_2D = X_Complete[:, i:i+num_steps]
Y_2D = Y_Complete[:, i:i+num_steps]
yield X_2D, Y_2D
your_oriseq = list(range(35))
for X_2D, Y_2D in partiting_orderly_solong(your_oriseq, 2, 5):
print(f"{X_2D}\tX\n{Y_2D}\tY")
原序列[0...34],34//5=6个“特征-标签”子序列,6//2=3个mini-batch。
由X[0][0:1]去预测Y[0][0],由X[0][0:2]去预测Y[0][1],由X[0][0:3]去预测Y[0][2],。。。
由X[1][0:1]去预测Y[1][0],由X[1][0:2]去预测Y[1][1],由X[1][0:3]去预测Y[1][2],。。。
'''4_3'''
封装上面2个策略。
def load_data_time_machine(batch_size, num_steps, tactic=1, upper=10000):
"""
tatic:数据迭代策略,-可选(0'随机采样', 1'顺序分区')
upper:切出一块迷你数据集的词元数量
"""
corpus_bank, vocab_room = using_novel_do_tokenize(upper)
class SeqLoader:
闭包类。
def __iter__(self):
使用闭包类.__iter__的好处是,每个epoch的for X_2D,Y_2D in iterate_sequence时,都会重新产生一遍数据供迭代。
而在工厂函数里return iter()的弊端是第1次epoch的for就会耗尽所有数据,第2个epoch就没有了。
if tactic == 1:
return iter(partiting_orderly_solong(corpus_bank, batch_size, num_steps))
else:
return iter(sampling_randomly_solong(corpus_bank, batch_size, num_steps))
iterate_sequence = SeqLoader()
return iterate_sequence, vocab_room
- A-Za-z ↩