seq2seq

GRU

  • 效果与LSTM相近,并且参数量比LSTM更少,计算复杂度也更小

模型思想-Attention

https://zhuanlan.zhihu.com/p/394166679

原始的 seq2seq
Encoder 和 Decoder 都是循环神经网络实现, 可以是 RNN, LSTM, GRU, CFG 等

GRU 即 Gated Recurrent Unit。 前面说到为了克服 RNN 无法很好处理远距离依赖而提出了LSTM, 而 GRU 则是 LSTM 的一个变体, 当然 LSTM 还有有很多其他的变体。GRU 保持了LSTM的效果同时又使结构更加简单, 所以它也非常流行。

隐含状态不断的传递, 得到 s, s 作为 decoder 的初始状态, 15-16 年非常流行的做机器翻译的框架, 不好的地方前面讲了, 离 s 比较远的稀释的比较厉害

基于 attention 的 seq2seq

seq2seq是第一个提出attention机制的模型。

  • Encoder 每一步的输出都会参与到 decoder 每一步的计算中!
  • 我们会拿 encoder_outputs 和 decoder 中每一步的 hidden_state 去计算 attention
  • Attention 理解为向量, 假如如图中所示 encoder_outputs 有 5 个, 那么 Attention 向量长度为 5, 代表输出的权重, 有了这个权重, 我们就可以拿着权重和 encoder_outputs 做加权求和, 得到 context_vector, 和 input 一块输入下一步的 decoder, 用来做词语的生成, 每一个词语都会这么做。

思路便是:Encoder的encoder_outputs与decoder生成的每一个部分都加进去计算,生成对应的注意力分数,然后用于decoder的下一步生成

模型中的注意力公式运算

  • EO: encoder 各个位置的输出
  • H: decoder 某一步的隐含状态
  • FC: 全连接层
  • X: decoder 的一个输入
  • context: 上下文向量

首先我们会得到一个 score(注意力所需要的一个分数) , score 计算有下面两种方式

  • score = FC(tanh(FC(EO) + FC(H))) —- [Bahdanau 注意力方式], 我们用这个

这里用 tanh 激活函数, 输出有正有负, 会让梯度更新的比较快, 不像 sigmiod 只有正值,score 是和 EO 长度一样的向量,tanh 是激活函数

  • 另一选项(就是 score 的另一种计算方法) : score = EOWH—-[luong 注意力]

让 score 进行接下来处理

  • attention_weights = softmax(score, axis = 1) –就变为了权重
  • context = sum(attention_weights * EO, axis = 1) EO 是矩阵, attention_weights 是向量,context 是向量
  • final_input = concat(context, embed(x)) x 和 context 拼接在一起

机器翻译实战

首先使用一个小数据集英语与西班牙语翻译, 总计有 11 万条, 来验证我们的模型

seq2seq_attention 实战(Sequence-to-Sequence)

实战步骤

  1. preprocessing data —数据 id 化和 dataset 生成
    Tokenizer word level-Tokenizer
  2. build model
    1. encoder 构建(使用 GRU)
    2. attention 构建——实现 Bahdanau —-重点, 难点
    3. decoder 构建:用的 lstm 变种 GRU
    4. loss& optimizer:自定义梯度的更新
    5. train:每次 epoch 调用 train
  3. evaluation (不适合看准确率,使用 bleu)
    1. given sentence, return translated results
    2. visualize results (attention) 注意力分数的可视化

数据预处理

  • 去除西班牙语中的重音
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import unicodedata
import re
from sklearn.model_selection import train_test_split

#因为西班牙语有一些是特殊字符,所以我们需要unicode转ascii,
# 这样值变小了,因为unicode太大
def unicode_to_ascii(s):
#NFD是转换方法,把每一个字节拆开,Mn是重音,所以去除
return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')

#下面我们找个样本测试一下
# 加u代表对字符串进行unicode编码
en_sentence = u"May I borrow this book?"
sp_sentence = u"¿Puedo tomar prestado este libro?"

print(unicode_to_ascii(en_sentence))
print(unicode_to_ascii(sp_sentence))
  • 数据预处理,控制标点符号与单词分开
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def preprocess_sentence(w):
#变为小写,去掉多余的空格,变成小写,id少一些
w = unicode_to_ascii(w.lower().strip())

# 在单词与跟在其后的标点符号之间插入一个空格
# eg: "he is a boy." => "he is a boy . "
# Reference:- https://stackoverflow.com/questions/3645931/python-padding-punctuation-with-white-spaces-keeping-punctuation
w = re.sub(r"([?.!,¿])", r" \1 ", w)
#因为可能有多余空格,替换为一个空格,所以处理一下
w = re.sub(r'[" "]+', " ", w)

# 除了 (a-z, A-Z, ".", "?", "!", ","),将所有字符替换为空格
w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w)

w = w.rstrip().strip()

return w

print(preprocess_sentence(en_sentence))
print(preprocess_sentence(sp_sentence))
print(preprocess_sentence(sp_sentence).encode('utf-8')) #¿是占用两个字节的
Dataset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from pathlib import Path
from torch.utils.data import Dataset, DataLoader

class LangPairDataset(Dataset):
fpath = Path(r"./data_spa_en/spa.txt") #数据文件路径
cache_path = Path(r"./.cache/lang_pair.npy") #缓存文件路径
split_index = np.random.choice(a=["train", "test"], replace=True, p=[0.9, 0.1], size=118964) #按照9:1划分训练集和测试集
def __init__(self, mode="train", cache=False):
if cache or not self.cache_path.exists():#如果没有缓存,或者缓存不存在,就处理一下数据
self.cache_path.parent.mkdir(parents=True, exist_ok=True) #创建缓存文件夹,如果存在就忽略
with open(self.fpath, "r", encoding="utf8") as file:
lines = file.readlines()
lang_pair = [[preprocess_sentence(w) for w in l.split('\t')] for l in lines] #处理数据,变成list((src, trg))的形式
trg, src = zip(*lang_pair) #分离出目标语言和源语言
trg=np.array(trg) #转换为numpy数组
src=np.array(src) #转换为numpy数组
np.save(self.cache_path, {"trg": trg, "src": src}) #保存为npy文件,方便下次直接读取,不用再处理
else:
lang_pair = np.load(self.cache_path, allow_pickle=True).item() #读取npy文件,allow_pickle=True允许读取字典
trg = lang_pair["trg"]
src = lang_pair["src"]

self.trg = trg[self.split_index == mode] #按照index拿到训练集的 标签语言 --英语
self.src = src[self.split_index == mode] #按照index拿到训练集的源语言 --西班牙

def __getitem__(self, index):
return self.src[index], self.trg[index]

def __len__(self):
return len(self.src)


train_ds = LangPairDataset("train")
test_ds = LangPairDataset("test")
  • 使用zip,将生成的list((src, trg)),分离出目标语言和源语言,并放在一起。
1
2
3
4
#zip例子
a = [[1,2],[4,5],[7,8]]
zipped = list(zip(*a))
print(zipped)
Tokenizer

这里有两种处理方式,分别对应着 encoder 和 decoder 的 word embedding 是否共享,这里实现不共享的方案。

构建词表转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from collections import Counter

def get_word_idx(ds, mode="src", threshold=2):
#载入词表,看下词表长度,词表就像英语字典
word2idx = {
"[PAD]": 0, # 填充 token
"[BOS]": 1, # begin of sentence
"[UNK]": 2, # 未知 token
"[EOS]": 3, # end of sentence
}
idx2word = {value: key for key, value in word2idx.items()}
index = len(idx2word)
threshold = 1 # 出现次数低于此的token舍弃
#如果数据集有很多个G,那是用for循环的,不能' '.join
word_list = " ".join([pair[0 if mode=="src" else 1] for pair in ds]).split()
counter = Counter(word_list) #统计词频,counter类似字典,key是单词,value是出现次数
print("word count:", len(counter))

for token, count in counter.items():
if count >= threshold:#出现次数大于阈值的token加入词表
word2idx[token] = index #加入词表
idx2word[index] = token #加入反向词表
index += 1

return word2idx, idx2word

src_word2idx, src_idx2word = get_word_idx(train_ds, "src") #源语言词表
trg_word2idx, trg_idx2word = get_word_idx(train_ds, "trg") #目标语言词表
构建Tokenizer
  • Tokenizer按语言分开可以减少embedding_dim的大小,对于翻译任务建议分开
  • 而问答任务存在多语言回答,就不能分开。
  • mask的作用:masks = (input_ids == self.pad_idx).to(dtype=torch.int64):mask是一个和input_ids一样大小的tensor,0代表token,1代表padding,mask用于去除padding的影响
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Tokenizer:
def __init__(self, word2idx, idx2word, max_length=500, pad_idx=0, bos_idx=1, eos_idx=3, unk_idx=2):
self.word2idx = word2idx
self.idx2word = idx2word
self.max_length = max_length
self.pad_idx = pad_idx
self.bos_idx = bos_idx
self.eos_idx = eos_idx
self.unk_idx = unk_idx

def encode(self, text_list, padding_first=False, add_bos=True, add_eos=True, return_mask=False):
"""如果padding_first == True,则padding加载前面,否则加载后面
return_mask: 是否返回mask(掩码),mask用于指示哪些是padding的,哪些是真实的token
"""
max_length = min(self.max_length, add_eos + add_bos + max([len(text) for text in text_list]))
indices_list = []
for text in text_list:
indices = [self.word2idx.get(word, self.unk_idx) for word in text[:max_length - add_bos - add_eos]] #如果词表中没有这个词,就用unk_idx代替,indices是一个list,里面是每个词的index,也就是一个样本的index
if add_bos:
indices = [self.bos_idx] + indices
if add_eos:
indices = indices + [self.eos_idx]
if padding_first:#padding加载前面,超参可以调
indices = [self.pad_idx] * (max_length - len(indices)) + indices
else:#padding加载后面
indices = indices + [self.pad_idx] * (max_length - len(indices))
indices_list.append(indices)
input_ids = torch.tensor(indices_list) #转换为tensor
masks = (input_ids == self.pad_idx).to(dtype=torch.int64) #mask是一个和input_ids一样大小的tensor,0代表token,1代表padding,mask用于去除padding的影响
return input_ids if not return_mask else (input_ids, masks)


def decode(self, indices_list, remove_bos=True, remove_eos=True, remove_pad=True, split=False):
text_list = []
for indices in indices_list:
text = []
for index in indices:
word = self.idx2word.get(index, "[UNK]") #如果词表中没有这个词,就用unk_idx代替
if remove_bos and word == "[BOS]":
continue
if remove_eos and word == "[EOS]":#如果到达eos,就结束
break
if remove_pad and word == "[PAD]":#如果到达pad,就结束
break
text.append(word) #单词添加到列表中
text_list.append(" ".join(text) if not split else text) #把列表中的单词拼接,变为一个句子
return text_list

#两个相对于1个toknizer的好处是embedding的参数量减少
src_tokenizer = Tokenizer(word2idx=src_word2idx, idx2word=src_idx2word) #源语言tokenizer
trg_tokenizer = Tokenizer(word2idx=trg_word2idx, idx2word=trg_idx2word) #目标语言tokenizer

# trg_tokenizer.encode([["hello"], ["hello", "world"]], add_bos=True, add_eos=False,return_mask=True)
raw_text = ["hello world".split(), "tokenize text datas with batch".split(), "this is a test".split()]
indices,mask = trg_tokenizer.encode(raw_text, padding_first=False, add_bos=True, add_eos=True,return_mask=True)
decode_text = trg_tokenizer.decode(indices.tolist(), remove_bos=False, remove_eos=False, remove_pad=False)
print("raw text"+'-'*10)
for raw in raw_text:
print(raw)
print("mask"+'-'*10)
for m in mask:
print(m)
print("indices"+'-'*10)
for index in indices:
print(index)
print("decode text"+'-'*10)
for decode in decode_text:
print(decode)
Datasetloader
  • 训练过程中,decoder处生成一个token后,下一个token是参考的真实标签label,而不是刚刚生成的token,因为是需要计算对应位置的loss。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def collate_fct(batch):
src_words = [pair[0].split() for pair in batch]
trg_words = [pair[1].split() for pair in batch]

# [PAD] [BOS] src [EOS]
encoder_inputs, encoder_inputs_mask = src_tokenizer.encode(
src_words, padding_first=True, add_bos=True, add_eos=True, return_mask=True
)

# [BOS] trg [PAD]
decoder_inputs = trg_tokenizer.encode(
trg_words, padding_first=False, add_bos=True, add_eos=False, return_mask=False,
)

# trg [EOS] [PAD]
decoder_labels, decoder_labels_mask = trg_tokenizer.encode(
trg_words, padding_first=False, add_bos=False, add_eos=True, return_mask=True
)

return {
"encoder_inputs": encoder_inputs.to(device=device),
"encoder_inputs_mask": encoder_inputs_mask.to(device=device),
"decoder_inputs": decoder_inputs.to(device=device),
"decoder_labels": decoder_labels.to(device=device),
"decoder_labels_mask": decoder_labels_mask.to(device=device),
} #当返回的数据较多时,用dict返回比较合理

sample_dl = DataLoader(train_ds, batch_size=2, shuffle=True, collate_fn=collate_fct)

for batch in sample_dl:
for key, value in batch.items():
print(key)
print(value)
break

上述decoder_inputs = trg_tokenizer.encode和decoder_labels, decoder_labels_mask = trg_tokenizer.encode分别对应decoder的输入输出:

定义模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Encoder(nn.Module):
def __init__(
self,
vocab_size,
embedding_dim=256,
hidden_dim=1024,
num_layers=1,
):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.gru = nn.GRU(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)

def forward(self, encoder_inputs):
# encoder_inputs.shape = [batch size, sequence length]
# bs, seq_len = encoder_inputs.shape
embeds = self.embedding(encoder_inputs)
# embeds.shape = [batch size, sequence length, embedding_dim]->[batch size, sequence length, hidden_dim]
seq_output, hidden = self.gru(embeds)
# seq_output.shape = [batch size, sequence length, hidden_dim],hidden.shape [ num_layers, batch size, hidden_dim]
return seq_output, hidden

#把上面的Encoder写一个例子,看看输出的shape
encoder = Encoder(vocab_size=100, embedding_dim=256, hidden_dim=1024, num_layers=4)
encoder_inputs = torch.randint(0, 100, (2, 50))
encoder_outputs, hidden = encoder(encoder_inputs)
print(encoder_outputs.shape)
print(hidden.shape)
print(encoder_outputs[:,-1,:])
print(hidden[-1,:,:]) #取最后一层的hidden
  • 注意每一层计算的输出内容:def forward(self, encoder_inputs)中的各项。

实现 Bahdanau

Wk、Wq、V分别对应上述公式的 **score = FC(tanh(FC(EO) + FC(H))) —- [Bahdanau 注意力方式]**的 FC(EO)、FC(H)、和 FC(tanh(FC(EO)

正向传播
:param query: decoder的hidden state,是decoder的隐藏状态,shape = [batch size, hidden_dim] ,多层decoder时也只会拿最后一层的 [batch size, hidden_dim]
:param keys: EO [batch size, sequence length, hidden_dim]
:param values: EO [batch size, sequence length, hidden_dim]
:param attn_mask:[batch size, sequence length]
:return:

  • scores = self.V(F.tanh(self.Wk(keys) + self.Wq(query.unsqueeze(-2))))
  • 全连接层只会对你的最后一维做矩阵运算
  • score.shape = [batch size, sequence length, 1]:意味着对于每个句子(batch size)中的每个单词(sequence length),我们都有一个得分。每个单词的得分是通过某些机制(如注意力机制)计算出来的,可以理解为该单词对最终结果的重要性。
  • values:表示与输入序列中每个单词对应的值,通常是隐藏状态向量或编码器的输出。它的形状为 [batch size, sequence length, hidden_dim],其中 hidden_dim 是每个单词的表示维度(如 GRU 或 Transformer 中的隐藏层大小)。
  • torch.mul(scores, values)**:是逐元素的乘法操作,将每个单词的 **score 与对应的 value 相乘。结果的形状将是 [batch size, sequence length, hidden_dim],即对每个时间步,score 和 value 之间的关系已经建立。
  • .sum(dim=-2)*:是对 **scores \ values 的结果沿着 sequence length(即-2维度) 进行求和,得到一个 context_vector。形状变为 [batch size, hidden_dim]
    • dim=-2 表示按序列的维度(时间步)求和。这意味着所有的单词的加权值(根据得分的权重)都会合并到一个向量中,即我们得到的 context_vector 是每个句子的加权平均向量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class BahdanauAttention(nn.Module):
def __init__(self, hidden_dim=1024):
super().__init__()
self.Wk = nn.Linear(hidden_dim, hidden_dim) #对keys做运算,encoder的输出EO
self.Wq = nn.Linear(hidden_dim, hidden_dim) #对query做运算,decoder的隐藏状态
self.V = nn.Linear(hidden_dim, 1)

def forward(self, query, keys, values, attn_mask=None):
"""
正向传播
:param query: hidden state,是decoder的隐藏状态,shape = [batch size, hidden_dim]
:param keys: EO [batch size, sequence length, hidden_dim]
:param values: EO [batch size, sequence length, hidden_dim]
:param attn_mask:[batch size, sequence length]
:return:
"""
# query.shape = [batch size, hidden_dim] -->通过unsqueeze(-2)增加维度 [batch size, 1, hidden_dim]
# keys.shape = [batch size, sequence length, hidden_dim]
# values.shape = [batch size, sequence length, hidden_dim]
scores = self.V(F.tanh(self.Wk(keys) + self.Wq(query.unsqueeze(-2)))) #unsqueeze(-2)增加维度
# score.shape = [batch size, sequence length, 1]
if attn_mask is not None: #这个mask是encoder_inputs_mask,用来mask掉padding的部分,让padding部分socres为0
# attn_mask is a matrix of 0/1 element,
# 1 means to mask logits while 0 means do nothing
# here we add -inf to the element while mask == 1
attn_mask = (attn_mask.unsqueeze(-1)) * -1e16 #在最后增加一个维度,[batch size, sequence length] --> [batch size, sequence length, 1]
scores += attn_mask
scores = F.softmax(scores, dim=-2) #对每一个词的score做softmax
# score.shape = [batch size, sequence length, 1]
context_vector = torch.mul(scores, values).sum(dim=-2) #对每一个词的score和对应的value做乘法,然后在seq_len维度上求和,得到context_vector
# context_vector.shape = [batch size, hidden_dim]
#socres用于最后的画图
return context_vector, scores