循环神经网络(RNN)

为什么需要RNN

合并+ padding 的缺点

  • 信息丢失
    • 多个 embedding 合并 (求均值)
    • Pad 噪音、 无主次 (padding 太多, 即便没有 padding, 一些重要的表达情感的词语, 没有主次)
  • 无效计算太多,低效
    • 太多的 padding
  • 序列问题:由于普通的神经网络(比如Embedding)是将多个句子词语通过全局平均池化合并成一个向量,来表示文章的积极和消极,这就导致信息丢失,除此之外,全局平均池化也没有考虑句子的语序问题,一个词语出现的先后效果也是不同的。
  • 图 1: 普通神经网络( Vanilla Neural Networks ) 一个样本对应一个输出(卷积神经网络,全连接神经网络)
  • 图 2: 1 对多:图片 生成描述
  • 图 3: 多对 1:文本分类(文本情感分析)
  • 图 4: 多对多: encoding-decoding,机器翻译, 需要在输入最后一个结束后, 才能进行输出,是非实时的
  • 图 5: 实时多对多:视频解说

核心: 维护一个状态作为下一步的额外输入
每一步使用同样的激活函数和参数 也就是 RNN 的 w 是不变的, 如下图所示

这里有知乎的解释 https://zhuanlan.zhihu.com/p/30844905

SimpleRNN

是一个单向单层循环神经网络

  • 这里的(16,64)就是对应的(embedding dim, hidden dim)
  • W就是(64,64),U就是(16,64)这两部份就是RNN的参数

单向双层RNN

  • RNN(num_layers=2):num_layers置为2

双向单层RNN

单向进行的时候,最前面的词往往遗忘的更快,也即衰减的更多,所以为了保持前后的影响一致,从后到前再做一个

  • RNN(bidirectional=True),bidirectional参数置为True
  • 双向参数量翻倍, 用另外一组参数做逆向
  • 因为是相当于使用同样的一层的参数做了一次逆向,然后将他们二者concat在一起,所以输出层就为两倍的hidden_dim
1
2
self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=bidirectional)
self.layer = nn.Linear(hidden_dim * (2 if bidirectional else 1), hidden_dim)

双向双层RNN

  • 上面的情况相加:bidirectional参数置为True ,同时num_layers置为2

定义模型

  • 这里的seq_output, final_hidden = self.rnn(x):
    • final_hidden为最后一个的状态输出即(1,128,64)(这里为了进行上述的矩阵相乘,是先将(128,500,16)–> (500,128,16)之后进行的运算,所以最后一个状态即为(1,128,64))
    • seq_output:是上述每个过程的所有状态,即Yt-1,Yt和Yt+1所有的都在,所以其最后一个就是final_hidden,为(128,500,64)即:seq_output[:,-1,:].squeeze() == final_hidden.squeeze()
    • batch_first=True参数表示当前输入为(batch, seq,feature),如果输入做了交换,输入为(seq,batch,feature)则应为False
    • bidirectional=False
    • num_layers=1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class RNN(nn.Module):
def __init__(self, embedding_dim=16, hidden_dim=64, vocab_size=vocab_size, num_layers=1, bidirectional=False):
super(RNN, self).__init__()
self.embeding = nn.Embedding(vocab_size, embedding_dim)
self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=bidirectional)
self.layer = nn.Linear(hidden_dim * (2 if bidirectional else 1), hidden_dim)
self.fc = nn.Linear(hidden_dim, 1)

def forward(self, x):
# [bs, seq length]
x = self.embeding(x)
# [bs, seq length, embedding_dim] -> shape [bs, embedding_dim, seq length]
seq_output, final_hidden = self.rnn(x)
# [bs, seq length, hidden_dim], [*, bs, hidden_dim]
x = seq_output[:, -1, :]
# 取最后一个时间步的输出 (这也是为什么要设置padding_first=True的原因)
x = self.layer(x)
x = self.fc(x)
return x

sample_inputs = torch.randint(0, vocab_size, (2, 128))

print("{:=^80}".format(" 一层单向 RNN "))
for key, value in RNN().named_parameters():
print(f"{key:^40}paramerters num: {np.prod(value.shape)}")


print("{:=^80}".format(" 一层双向 RNN "))
for key, value in RNN(bidirectional=True).named_parameters():
print(f"{key:^40}paramerters num: {np.prod(value.shape)}")


print("{:=^80}".format(" 俩层单向 RNN "))
for key, value in RNN(num_layers=2).named_parameters():
print(f"{key:^40}paramerters num: {np.prod(value.shape)}")

  • (128,500,16)——> (128,64)x(64,64)+(128,16)x(16,64)

  • 8192的来源:上述(128,64)变为(128,64)x(64,64)+(128,128)x(128,64),所以128x64=8,192
  • 4096的来源:64x64=4,096

实战分析

1 构建 rnn 模型

当考虑神经网络层的参数量时, 计算方法如下:

  1. Embedding 层:

    • vocab_size 是词汇表大小。
    • embedding_dim 是嵌入维度。
    • 参数量为 vocab_size * embedding_dim, 因为每个词汇都有一个对应的嵌入向量。
  2. RNN 层:

    • embedding_dim 是输入维度。

    • hidden_dim 是隐藏层维度。

    • RNN 权重参数由两部分组成:

      • weight_ih: 输入到隐藏层的权重, 大小为 (hidden_dim, embedding_dim)。
      • weight_hh: 隐藏层到隐藏层的权重, 大小为 (hidden_dim, hidden_dim)。
    • RNN 偏置参数由两部分组成:

      • bias_ih: 输入到隐藏层的偏置, 大小为 (hidden_dim,)。
      • bias_hh: 隐藏层到隐藏层的偏置, 大小为 (hidden_dim,)。
    • 总参数量为 hidden_dim * (hidden_dim + embedding_dim) + 2 * hidden_dim。

  3. 全连接层 fc:

    • 输入维度是 hidden_dim, 输出维度是 vocab_size。

    • 权重参数大小为 (vocab_size, hidden_dim)。

    • 偏置参数大小为 (vocab_size,)。

    • 参数量为 vocab_size * hidden_dim + vocab_size。

结合你提供的参数数量和上述计算方法:

  • embedding.weight: vocab_size * embedding_dim, 即 65 * 256 = 16640。
  • rnn.weight_ih_l0: hidden_dim * embedding_dim, 即 1024 * 256 = 262144。
  • rnn.weight_hh_l0: hidden_dim * hidden_dim, 即 1024 * 1024 = 1048576。
  • rnn.bias_ih_l0 和 rnn.bias_hh_l0: 每个都有 hidden_dim 个参数, 即 1024 + 1024 = 2048。
  • fc.weight: vocab_size * hidden_dim, 即 65 * 1024 = 66560。
  • fc.bias: vocab_size, 即 65

BRNN 的应用包括:

语音识别(与长时记忆结合)、翻译、手写识别、蛋白质结构预测、词性标记、依赖解析、实体提取

文本生成

使用莎士比亚文集做文本生成

数据准备

1
2
3
4
5
6
7
!wget https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt

with open("./shakespeare.txt", "r", encoding="utf8") as file:
text = file.read()

print("length", len(text))
print(text[0:100])

构造字典

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 1. generate vocab
# 2. build mapping char->id
# 3. data -> id_data 把数据都转为id
# 4. a b c d [EOS] -> [BOS] b c d 预测下一个字符生成的模型,也就是输入是a,输出就是b

#去重,留下独立字符,并排序(排序是为了好看)
vocab = sorted(set(text))
print(len(vocab))
print(vocab)

#每个字符都编好号,enumerate对每一个位置编号,生成的是列表中是元组,下面字典生成式
char2idx = {char:idx for idx, char in enumerate(vocab)}
print(char2idx)

# 把vocab从列表变为ndarray
idx2char = np.array(vocab)
print(idx2char)

#把字符都转换为id
text_as_int = np.array([char2idx[c] for c in text])
print(text_as_int.shape)
print(len(text_as_int))
print(text_as_int[0:10])
print(text[0:10])

Dataset与Dataloader的构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from torch.utils.data import Dataset, DataLoader

class CharDataset(Dataset):
#text_as_int是字符的id列表,seq_length是每个样本的长度
def __init__(self, text_as_int, seq_length):
self.sub_len = seq_length + 1 #一个样本的长度
self.text_as_int = text_as_int
self.num_seq = len(text_as_int) // self.sub_len #样本的个数

def __getitem__(self, index):#index是样本的索引,返回的是一个样本,比如第一个,就是0-100的字符,总计101个字符
return self.text_as_int[index * self.sub_len: (index + 1) * self.sub_len]

def __len__(self): #返回样本的个数
return self.num_seq

#batch是一个列表,列表中的每一个元素是一个样本,有101个字符,前100个是输入,后100个是输出
def collat_fct(batch):
src_list = [] #输入
trg_list = [] #输出
for part in batch:
src_list.append(part[:-1]) #输入
trg_list.append(part[1:]) #输出

src_list = np.array(src_list) #把列表转换为ndarray
trg_list = np.array(trg_list) #把列表转换为ndarray
return torch.Tensor(src_list).to(dtype=torch.int64), torch.Tensor(trg_list).to(dtype=torch.int64) #返回的是一个元组,元组中的每一个元素是一个torch.Tensor

#每个样本的长度是101,也就是100个字符+1个结束符
train_ds = CharDataset(text_as_int, 100)
train_dl = DataLoader(train_ds, batch_size=64, shuffle=True, collate_fn=collat_fct)

定义模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class CharRNN(nn.Module):
def __init__(self, vocab_size, embedding_dim=256, hidden_dim=1024):
super(CharRNN, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
#batch_first=True,输入的数据格式是(batch_size, seq_len, embedding_dim)
self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
self.fc = nn.Linear(hidden_dim, vocab_size)

def forward(self, x, hidden=None):
x = self.embedding(x) #(batch_size, seq_len) -> (batch_size, seq_len, embedding_dim) (64, 100, 256)
#这里和02的差异是没有只拿最后一个输出,而是把所有的输出都拿出来了
#(batch_size, seq_len, embedding_dim)->(batch_size, seq_len, hidden_dim)(64, 100, 1024)
output, hidden = self.rnn(x, hidden)
x = self.fc(output) #[bs, seq_len, hidden_dim]--->[bs, seq_len, vocab_size] (64, 100,65)
return x, hidden #x的shape是(batch_size, seq_len, vocab_size)


vocab_size = len(vocab)

print("{:=^80}".format(" 一层单向 RNN "))
for key, value in CharRNN(vocab_size).named_parameters():
print(f"{key:^40}paramerters num: {np.prod(value.shape)}")

训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# 训练
def training(
model,
train_loader,
epoch,
loss_fct,
optimizer,
save_ckpt_callback=None,
stateful=False # 想用stateful,batch里的数据就必须连续,不能打乱
):
record_dict = {
"train": [],
}

global_step = 0
model.train()
hidden = None
with tqdm(total=epoch * len(train_loader)) as pbar:
for epoch_id in range(epoch):
# training
for datas, labels in train_loader:
datas = datas.to(device)
labels = labels.to(device)
# 梯度清空
optimizer.zero_grad()
# 模型前向计算,如果数据集打乱了,stateful=False,hidden就要清空
# 如果数据集没有打乱,stateful=True,hidden就不需要清空
logits, hidden = model(datas, hidden=hidden if stateful else None)
# 计算损失,交叉熵损失第一个参数要是二阶张量,第二个参数要是一阶张量,所以要reshape
loss = loss_fct(logits.reshape(-1, vocab_size), labels.reshape(-1))
# 梯度回传
loss.backward()
# 调整优化器,包括学习率的变动等
optimizer.step()

loss = loss.cpu().item()
# record

record_dict["train"].append({
"loss": loss, "step": global_step
})

# 保存模型权重 save model checkpoint
if save_ckpt_callback is not None:
save_ckpt_callback(global_step, model.state_dict(), metric=-loss)
# udate step
global_step += 1
pbar.update(1)
pbar.set_postfix({"epoch": epoch_id})

return record_dict


epoch = 100

model = CharRNN(vocab_size=vocab_size)

# 1. 定义损失函数 采用交叉熵损失
loss_fct = nn.CrossEntropyLoss()
# 2. 定义优化器 采用 adam
# Optimizers specified in the torch.optim package
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


# save best
if not os.path.exists("checkpoints"):
os.makedirs("checkpoints")
save_ckpt_callback = SaveCheckpointsCallback("checkpoints/text_generation", save_step=1000, save_best_only=True)


model = model.to(device)

record = training(
model,
train_dl,
epoch,
loss_fct,
optimizer,
save_ckpt_callback=save_ckpt_callback,
)
  • 如果数据集没有打乱,stateful=True,hidden就不需要清空
  • 计算损失,交叉熵损失第一个参数要是二阶张量,第二个参数要是一阶张量,所以要reshape:
    • 第一个参数将原本的(3,100,64)——>(300,64)
    • 第二个参数将原本的(64,100)——>(6400)

推理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import torch

# 创建一个概率分布,表示每个类别被选中的概率
# 这里我们有一个简单的四个类别的概率分布
prob_dist = torch.tensor([0.1, 0.45, 0.35, 0.1])

# 使用 multinomial 进行抽样
# num_samples 表示要抽取的样本数量
num_samples = 5

# 抽取样本,随机抽样,概率越高,抽到的概率就越高
samples = torch.multinomial(prob_dist, 1, replacement=True)

print("概率分布:", prob_dist)
print("抽取的样本索引:", samples)

# 显示每个样本对应的概率
print("每个样本对应的概率:", prob_dist[samples])

def generate_text(model, start_string, max_len=1000, temperature=1.0, stream=True):
input_eval = torch.Tensor([char2idx[char] for char in start_string]).to(dtype=torch.int64, device=device).reshape(1, -1) #bacth_size=1, seq_len长度是多少都可以 (1,5)
hidden = None
text_generated = [] #用来保存生成的文本
model.eval()
pbar = tqdm(range(max_len)) # 进度条
print(start_string, end="")
# no_grad是一个上下文管理器,用于指定在其中的代码块中不需要计算梯度。在这个区域内,不会记录梯度信息,用于在生成文本时不影响模型权重。
with torch.no_grad():
for i in pbar:#控制进度条
logits, hidden = model(input_eval, hidden=hidden)
# 温度采样,较高的温度会增加预测结果的多样性,较低的温度则更加保守。
#取-1的目的是只要最后,拼到原有的输入上
logits = logits[0, -1, :] / temperature
# using multinomial to sampling
probs = F.softmax(logits, dim=-1) #算为概率分布
idx = torch.multinomial(probs, 1).item() #从概率分布中抽取一个样本,取概率较大的那些
input_eval = torch.Tensor([idx]).to(dtype=torch.int64, device=device).reshape(1, -1) #把idx转为tensor
text_generated.append(idx)
if stream:
print(idx2char[idx], end="", flush=True)
return "".join([idx2char[i] for i in text_generated])


# load checkpoints
model.load_state_dict(torch.load("checkpoints/text_generation/best.ckpt", map_location="cpu"))
start_string = "All: " #这里就是开头,什么都可以
res = generate_text(model, start_string, max_len=1000, temperature=0.5, stream=True)
  • logits = logits[0, -1, :] / temperature:

    • 除 temperature之后,softmax的概率值越接近,每个生成越随机。

    • 当 temperature 较小(接近 0)时,模型生成的输出会更加 确定保守

      模型会倾向于选择 概率较高的单词,而 低概率的单词 基本不会被选中。

    • 当 temperature 较大(比如大于 1)时,模型生成的输出会更加 多样化随机

      温度较大时,生成时概率分布变得更加平滑,低概率的单词也有机会被选择,从而 增加了生成文本的多样性