循环神经网络(RNN) 为什么需要RNN 合并+ padding 的缺点
信息丢失
多个 embedding 合并 (求均值)
Pad 噪音、 无主次 (padding 太多, 即便没有 padding, 一些重要的表达情感的词语, 没有主次)
无效计算太多,低效
序列问题:由于普通的神经网络(比如Embedding)是将多个句子词语通过全局平均池化合并成一个向量,来表示文章的积极和消极,这就导致信息丢失,除此之外,全局平均池化也没有考虑句子的语序问题,一个词语出现的先后效果也是不同的。
图 1: 普通神经网络( Vanilla Neural Networks ) 一个样本对应一个输出(卷积神经网络,全连接神经网络)
图 2: 1 对多:图片 生成描述
图 3: 多对 1:文本分类(文本情感分析)
图 4: 多对多: encoding-decoding,机器翻译, 需要在输入最后一个结束后, 才能进行输出,是非实时的
图 5: 实时多对多:视频解说
核心: 维护一个状态作为下一步的额外输入 每一步使用同样的激活函数和参数 也就是 RNN 的 w 是不变的, 如下图所示
这里有知乎的解释 https://zhuanlan.zhihu.com/p/30844905
SimpleRNN 是一个单向单层循环神经网络
这里的(16,64) 就是对应的(embedding dim, hidden dim)
W就是(64,64),U就是(16,64)这两部份就是RNN的参数
单向双层RNN
RNN(num_layers=2):num_layers置为2
双向单层RNN 单向进行的时候,最前面的词往往遗忘的更快,也即衰减的更多,所以为了保持前后的影响一致,从后到前再做一个
RNN(bidirectional=True),bidirectional参数置为True
双向参数量翻倍, 用另外一组参数做逆向
因为是相当于使用同样的一层的参数做了一次逆向,然后将他们二者concat在一起,所以输出层就为两倍的hidden_dim
1 2 self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True , bidirectional=bidirectional) self.layer = nn.Linear(hidden_dim * (2 if bidirectional else 1 ), hidden_dim)
双向双层RNN
上面的情况相加:bidirectional参数置为True ,同时num_layers置为2
定义模型
这里的seq_output, final_hidden = self.rnn(x):
final_hidden为最后一个的状态输出即(1,128,64)(这里为了进行上述的矩阵相乘,是先将(128,500,16)–> (500,128,16)之后进行的运算,所以最后一个状态即为(1,128,64))
seq_output:是上述每个过程的所有状态,即Yt-1,Yt和Yt+1所有的都在,所以其最后一个就是final_hidden,为(128,500,64)即:seq_output[:,-1,:].squeeze() == final_hidden.squeeze()
batch_first=True参数表示当前输入为(batch, seq,feature),如果输入做了交换,输入为(seq,batch,feature)则应为False
bidirectional=False
num_layers=1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class RNN (nn.Module ): def __init__ (self, embedding_dim=16 , hidden_dim=64 , vocab_size=vocab_size, num_layers=1 , bidirectional=False ): super (RNN, self).__init__() self.embeding = nn.Embedding(vocab_size, embedding_dim) self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True , bidirectional=bidirectional) self.layer = nn.Linear(hidden_dim * (2 if bidirectional else 1 ), hidden_dim) self.fc = nn.Linear(hidden_dim, 1 ) def forward (self, x ): x = self.embeding(x) seq_output, final_hidden = self.rnn(x) x = seq_output[:, -1 , :] x = self.layer(x) x = self.fc(x) return x sample_inputs = torch.randint(0 , vocab_size, (2 , 128 )) print("{:=^80}" .format (" 一层单向 RNN " )) for key, value in RNN().named_parameters(): print(f"{key:^40 } paramerters num: {np.prod(value.shape)} " ) print("{:=^80}" .format (" 一层双向 RNN " )) for key, value in RNN(bidirectional=True ).named_parameters(): print(f"{key:^40 } paramerters num: {np.prod(value.shape)} " ) print("{:=^80}" .format (" 俩层单向 RNN " )) for key, value in RNN(num_layers=2 ).named_parameters(): print(f"{key:^40 } paramerters num: {np.prod(value.shape)} " )
(128,500,16)——> (128,64)x(64,64)+(128,16)x(16,64)
8192的来源:上述(128,64)变为(128,64)x(64,64)+(128,128)x(128,64) ,所以128x64=8,192
4096的来源:64x64=4,096
实战分析 1 构建 rnn 模型 当考虑神经网络层的参数量时, 计算方法如下:
Embedding 层:
vocab_size 是词汇表大小。
embedding_dim 是嵌入维度。
参数量为 vocab_size * embedding_dim, 因为每个词汇都有一个对应的嵌入向量。
RNN 层:
全连接层 fc:
输入维度是 hidden_dim, 输出维度是 vocab_size。
权重参数大小为 (vocab_size, hidden_dim)。
偏置参数大小为 (vocab_size,)。
参数量为 vocab_size * hidden_dim + vocab_size。
结合你提供的参数数量和上述计算方法:
embedding.weight: vocab_size * embedding_dim, 即 65 * 256 = 16640。
rnn.weight_ih_l0: hidden_dim * embedding_dim, 即 1024 * 256 = 262144。
rnn.weight_hh_l0: hidden_dim * hidden_dim, 即 1024 * 1024 = 1048576。
rnn.bias_ih_l0 和 rnn.bias_hh_l0: 每个都有 hidden_dim 个参数, 即 1024 + 1024 = 2048。
fc.weight: vocab_size * hidden_dim, 即 65 * 1024 = 66560。
fc.bias: vocab_size, 即 65
BRNN 的应用包括: 语音识别(与长时记忆结合)、翻译、手写识别、蛋白质结构预测、词性标记、依赖解析、实体提取
文本生成 使用莎士比亚文集做文本生成
数据准备 1 2 3 4 5 6 7 !wget https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt with open ("./shakespeare.txt" , "r" , encoding="utf8" ) as file: text = file.read() print("length" , len (text)) print(text[0 :100 ])
构造字典 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 vocab = sorted (set (text)) print(len (vocab)) print(vocab) char2idx = {char:idx for idx, char in enumerate (vocab)} print(char2idx) idx2char = np.array(vocab) print(idx2char) text_as_int = np.array([char2idx[c] for c in text]) print(text_as_int.shape) print(len (text_as_int)) print(text_as_int[0 :10 ]) print(text[0 :10 ])
Dataset与Dataloader的构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from torch.utils.data import Dataset, DataLoaderclass CharDataset (Dataset ): def __init__ (self, text_as_int, seq_length ): self.sub_len = seq_length + 1 self.text_as_int = text_as_int self.num_seq = len (text_as_int) // self.sub_len def __getitem__ (self, index ): return self.text_as_int[index * self.sub_len: (index + 1 ) * self.sub_len] def __len__ (self ): return self.num_seq def collat_fct (batch ): src_list = [] trg_list = [] for part in batch: src_list.append(part[:-1 ]) trg_list.append(part[1 :]) src_list = np.array(src_list) trg_list = np.array(trg_list) return torch.Tensor(src_list).to(dtype=torch.int64), torch.Tensor(trg_list).to(dtype=torch.int64) train_ds = CharDataset(text_as_int, 100 ) train_dl = DataLoader(train_ds, batch_size=64 , shuffle=True , collate_fn=collat_fct)
定义模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class CharRNN (nn.Module ): def __init__ (self, vocab_size, embedding_dim=256 , hidden_dim=1024 ): super (CharRNN, self).__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim) self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True ) self.fc = nn.Linear(hidden_dim, vocab_size) def forward (self, x, hidden=None ): x = self.embedding(x) output, hidden = self.rnn(x, hidden) x = self.fc(output) return x, hidden vocab_size = len (vocab) print("{:=^80}" .format (" 一层单向 RNN " )) for key, value in CharRNN(vocab_size).named_parameters(): print(f"{key:^40 } paramerters num: {np.prod(value.shape)} " )
训练 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 def training ( model, train_loader, epoch, loss_fct, optimizer, save_ckpt_callback=None , stateful=False ): record_dict = { "train" : [], } global_step = 0 model.train() hidden = None with tqdm(total=epoch * len (train_loader)) as pbar: for epoch_id in range (epoch): for datas, labels in train_loader: datas = datas.to(device) labels = labels.to(device) optimizer.zero_grad() logits, hidden = model(datas, hidden=hidden if stateful else None ) loss = loss_fct(logits.reshape(-1 , vocab_size), labels.reshape(-1 )) loss.backward() optimizer.step() loss = loss.cpu().item() record_dict["train" ].append({ "loss" : loss, "step" : global_step }) if save_ckpt_callback is not None : save_ckpt_callback(global_step, model.state_dict(), metric=-loss) global_step += 1 pbar.update(1 ) pbar.set_postfix({"epoch" : epoch_id}) return record_dict epoch = 100 model = CharRNN(vocab_size=vocab_size) loss_fct = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001 ) if not os.path.exists("checkpoints" ): os.makedirs("checkpoints" ) save_ckpt_callback = SaveCheckpointsCallback("checkpoints/text_generation" , save_step=1000 , save_best_only=True ) model = model.to(device) record = training( model, train_dl, epoch, loss_fct, optimizer, save_ckpt_callback=save_ckpt_callback, )
如果数据集没有打乱,stateful=True,hidden就不需要清空
计算损失,交叉熵损失第一个参数要是二阶张量,第二个参数要是一阶张量,所以要reshape:
第一个参数将原本的(3,100,64)——>(300,64)
第二个参数将原本的(64,100)——>(6400)
推理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import torchprob_dist = torch.tensor([0.1 , 0.45 , 0.35 , 0.1 ]) num_samples = 5 samples = torch.multinomial(prob_dist, 1 , replacement=True ) print("概率分布:" , prob_dist) print("抽取的样本索引:" , samples) print("每个样本对应的概率:" , prob_dist[samples]) def generate_text (model, start_string, max_len=1000 , temperature=1.0 , stream=True ): input_eval = torch.Tensor([char2idx[char] for char in start_string]).to(dtype=torch.int64, device=device).reshape(1 , -1 ) hidden = None text_generated = [] model.eval () pbar = tqdm(range (max_len)) print(start_string, end="" ) with torch.no_grad(): for i in pbar: logits, hidden = model(input_eval, hidden=hidden) logits = logits[0 , -1 , :] / temperature probs = F.softmax(logits, dim=-1 ) idx = torch.multinomial(probs, 1 ).item() input_eval = torch.Tensor([idx]).to(dtype=torch.int64, device=device).reshape(1 , -1 ) text_generated.append(idx) if stream: print(idx2char[idx], end="" , flush=True ) return "" .join([idx2char[i] for i in text_generated]) model.load_state_dict(torch.load("checkpoints/text_generation/best.ckpt" , map_location="cpu" )) start_string = "All: " res = generate_text(model, start_string, max_len=1000 , temperature=0.5 , stream=True )