长短期记忆网络 LSTM

为什么需要 LSTM (Long short-term memory)

普通 RNN 的信息不能长久传播(存在于理论上)—原因是针对结尾较远的信息被稀释的比较厉害

  • 引入选择性机制(门机制)

    • 选择性输入

    • 选择性遗忘

    • 选择性输出

选择性机制实现的原理 ——> 门:Sigmoid 函数: [0,1]

LSTM(长短时记忆) 是一种循环神经网络, 具有门控结构, 用于处理序列数据。其运算过程可以概括为以下几个步骤:

  1. 输入门(Input Gate) : 计算当前输入和前一时刻的输出是否应该被记忆。
  2. 遗忘门(Forget Gate) : 决定前一时刻输出, 结合当前的输入, 哪些记忆是否被保留。
  3. 记忆单元(Memory Cell) : 根据输入门和遗忘门的结果更新记忆状态。
  4. 输出门(Output Gate) : 基于当前输入和记忆状态计算当前时刻的输出。

https://www.bilibili.com/video/BV1Z34y1k7mc/?spm_id_from=333.1387.search.video_card.click&vd_source=c9b1c252315e6753ab148ae6b39a7dc3

https://blog.csdn.net/weixin_44162104/article/details/88660003

LSTM 图示

以下图来进行理解:

  • 遗忘门:用于使用sigmoid,取值在(0,1),删去Ct-1中的取值为0的元素,相当于选择性遗忘了部分记忆
  • 输入门:第一部份进行选择,然后使用tanh,取值在(-1,1),不是遗忘而是进行梳理归纳,并写入C中
  • Ct=f1 * Ct-1 + f2:先相乘再相加,更新了Ct。用于传递以及更新St等到Yt
  • 可以理解为在短期记忆的RNN上(St这条竖线),加上了一个长期记忆的节点(Ct这条竖线线)中间的交互即为,遗忘门和输入门。负责删去和写入内容。并对Yt进行影响

对应的图示:

LSTM 公式

  • 从上面公式可以看出, 参数量是 rnn 的 4 倍
  • Ht-1 和 xt 拼接是直接加起来的
  • LSTM的计算过程,权重参数数目,weight_ih_l0,weight_hh_l0
  • 源码 lstm 是继承 rnn 实现的
  • RNN 没有细胞状态; LSTM 通过细胞状态记忆信息。RNN 激活函数只有 tanh; LSTM 通过输入门、 遗忘门、 输出门引入 sigmoid 函数并结合 tanh 函数, 添加求和操作, 减少梯度消失和梯度爆炸的可能性。RNN 只能够处理短期依赖问题; LSTM 既能够处理短期依赖问题, 又能够处理长期依赖问题。

文本分类

  • 代码和RNN相同,只是在nn.RNN改为nn.LSTM。embedding_dim和hidden_dim和RNN一样。
  • 参数量是 RNN 的 4 倍:对应的四个W[ht-1,xt] (4x16x64)(4x64x64) 和 四个b

让我们来解析这些参数的数量和计算方式:

  1. embedding.weight:

    • vocab_size 是词汇表大小。

    • embedding_dim 是嵌入维度。

    • 参数量为 vocab_size * embedding_dim

  2. lstm.weight_ih_l0 和 lstm.weight_hh_l0:

    • embedding_dim 是输入维度。
    • hidden_dim 是 LSTM 隐藏层的维度。
    • 对于单向 LSTM:
      • weight_ih 是输入到隐藏层的权重, 大小为 (4 * hidden_dim, embedding_dim)。
      • weight_hh 是隐藏层到隐藏层的权重, 大小为 (4 * hidden_dim, hidden_dim)。
    • 对于双向 LSTM:
      • weight_ih 是输入到隐藏层的权重, 大小为 (4 * hidden_dim, embedding_dim)。
      • weight_hh 是隐藏层到隐藏层的权重, 大小为 (4 * hidden_dim, hidden_dim)。
    • 参数量计算为 (4 * hidden_dim * (embedding_dim + hidden_dim))。
  3. lstm.bias_ih_l0 和 lstm.bias_hh_l0:

    • LSTM 层的偏置参数。
    • 对于单向 LSTM, 每个都有 4 * hidden_dim 个参数。
    • 对于双向 LSTM, 每个都有 8 * hidden_dim 个参数。
    • 参数量计算为 4 * hidden_dim 或 8 * hidden_dim。
  4. layer.weight:

    • 将隐藏状态维度从 hidden_dim * (2 if bidirectional else 1) 转换为 hidden_dim。
    • 参数量计算为 hidden_dim * hidden_dim。
  5. layer.bias:

    • 线性层的偏置参数。
    • 参数量计算为 hidden_dim。
  6. fc.weight:

    • 输入维度是 hidden_dim, 输出维度是 1。
    • 参数量计算为 hidden_dim * 1 = hidden_dim。
  7. fc.bias:

    • 输出层的偏置参数。
    • 参数量计算为 1

根据提供的参数数量和上述计算方式:

  • embedding.weight: vocab_size * embedding_dim = 10000 * 16 = 160000。
  • lstm.weight_ih_l0lstm.weight_hh_l0 : 4 * hidden_dim * (embedding_dim + hidden_dim), 即 4 * 64 * 16 = 4096 和 4 * 64 * 64 = 16384。
  • lstm.bias_ih_l0lstm.bias_hh_l0: 4 * hidden_dim = 256。
  • layer.weight: hidden_dim * hidden_dim = 64 * 64 = 4096。
  • layer.bias: hidden_dim = 64。
  • fc.weight: hidden_dim = 64。
  • fc.bias: 1。

代码

准备数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from tensorflow import keras
#用karas有的数据集imdb,电影分类,分电影是积极的,还是消极的
imdb = keras.datasets.imdb
#载入数据使用下面两个参数
vocab_size = 10000 #词典大小,仅保留训练数据中前10000个最经常出现的单词,低频单词被舍弃
index_from = 3 #0,1,2,3空出来做别的事
#前一万个词出现词频最高的会保留下来进行处理,后面的作为特殊字符处理,
# 小于3的id都是特殊字符,下面代码有写
# 需要注意的一点是取出来的词表还是从1开始的,需要做处理
(train_data, train_labels), (test_data, test_labels) = imdb.load_data(
num_words = vocab_size, index_from = index_from)

#载入词表,看下词表长度,词表就像英语字典
word_index = imdb.get_word_index()
print(len(word_index))
print(type(word_index))
#词表虽然有8万多,但是我们只载入了最高频的1万词!!!!

构造 word2idx 和 idx2word

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
word2idx = {word: idx + 3 for word, idx in word_index.items()}
word2idx.update({
"[PAD]": 0, # 填充 token
"[BOS]": 1, # begin of sentence
"[UNK]": 2, # 未知 token
"[EOS]": 3, # end of sentence
})

idx2word = {idx: word for word, idx in word2idx.items()}

# 选择 max_length
length_collect = {}
for text in train_data:
length = len(text)
length_collect[length] = length_collect.get(length, 0) + 1

MAX_LENGTH = 500
plt.bar(length_collect.keys(), length_collect.values())
plt.axvline(MAX_LENGTH, label="max length", c="gray", ls=":")
plt.legend()
plt.show()

Tokenizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Tokenizer:
def __init__(self, word2idx, idx2word, max_length=500, pad_idx=0, bos_idx=1, eos_idx=3, unk_idx=2):
self.word2idx = word2idx
self.idx2word = idx2word
self.max_length = max_length
self.pad_idx = pad_idx
self.bos_idx = bos_idx
self.eos_idx = eos_idx
self.unk_idx = unk_idx

def encode(self, text_list, padding_first=False):
"""如果padding_first == True,则padding加载前面,否则加载后面"""
max_length = min(self.max_length, 2 + max([len(text) for text in text_list]))
indices_list = []
for text in text_list:
indices = [self.bos_idx] + [self.word2idx.get(word, self.unk_idx) for word in text[:max_length-2]] + [self.eos_idx]
if padding_first:
indices = [self.pad_idx] * (max_length - len(indices)) + indices
else:
indices = indices + [self.pad_idx] * (max_length - len(indices))
indices_list.append(indices)
return torch.tensor(indices_list)


def decode(self, indices_list, remove_bos=True, remove_eos=True, remove_pad=True, split=False):
text_list = []
for indices in indices_list:
text = []
for index in indices:
word = self.idx2word.get(index, "[UNK]")
if remove_bos and word == "[BOS]":
continue
if remove_eos and word == "[EOS]":
break
if remove_pad and word == "[PAD]":
break
text.append(word)
text_list.append(" ".join(text) if not split else text)
return text_list


tokenizer = Tokenizer(word2idx=word2idx, idx2word=idx2word)
raw_text = ["hello world".split(), "tokenize text datas with batch".split(), "this is a test".split()]
indices = tokenizer.encode(raw_text, padding_first=True)
decode_text = tokenizer.decode(indices.tolist(), remove_bos=False, remove_eos=False, remove_pad=False)
print("raw text")
for raw in raw_text:
print(raw)
print("indices")
for index in indices:
print(index)
print("decode text")
for decode in decode_text:
print(decode)

数据集与 DataLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from torch.utils.data import Dataset, DataLoader

class IMDBDataset(Dataset):
def __init__(self, data, labels, remain_length=True):
if remain_length:
self.data = tokenizer.decode(data, remove_bos=False, remove_eos=False, remove_pad=False)
else:
# 缩减一下数据
self.data = tokenizer.decode(data)
self.labels = labels

def __getitem__(self, index):
text = self.data[index]
label = self.labels[index]
return text, label

def __len__(self):
return len(self.data)


def collate_fct(batch):
text_list = [item[0].split() for item in batch]
label_list = [item[1] for item in batch]
# 这里使用 padding first
text_list = tokenizer.encode(text_list, padding_first=True).to(dtype=torch.int)
return text_list, torch.tensor(label_list).reshape(-1, 1).to(dtype=torch.float)


# 用RNN,缩短序列长度
train_ds = IMDBDataset(train_data, train_labels, remain_length=False)
test_ds = IMDBDataset(test_data, test_labels, remain_length=False)

batch_size = 128
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_fct)
test_dl = DataLoader(test_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fct)

定义模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class LSTM(nn.Module):
def __init__(self, embedding_dim=16, hidden_dim=64, vocab_size=vocab_size, num_layers=1, bidirectional=False):
super(LSTM, self).__init__()
self.embeding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=bidirectional)
self.layer = nn.Linear(hidden_dim * (2 if bidirectional else 1), hidden_dim)
self.fc = nn.Linear(hidden_dim, 1)

def forward(self, x):
# [bs, seq length]
x = self.embeding(x)
# [bs, seq length, embedding_dim] -> shape [bs, embedding_dim, seq length]
seq_output, (hidden, cell) = self.lstm(x)
# [bs, seq length, hidden_dim], [*, bs, hidden_dim]
x = seq_output[:, -1, :]
# 取最后一个时间步的输出 (这也是为什么要设置padding_first=True的原因)
x = self.layer(x)
x = self.fc(x)
return x

sample_inputs = torch.randint(0, vocab_size, (2, 128))

print("{:=^80}".format(" 一层单向 LSTM "))
for key, value in LSTM().named_parameters():
print(f"{key:^40}paramerters num: {np.prod(value.shape)}")


print("{:=^80}".format(" 一层双向 LSTM "))
for key, value in LSTM(bidirectional=True).named_parameters():
print(f"{key:^40}paramerters num: {np.prod(value.shape)}")


print("{:=^80}".format(" 两层单向 LSTM "))
for key, value in LSTM(num_layers=2).named_parameters():
print(f"{key:^40}paramerters num: {np.prod(value.shape)}")

训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from sklearn.metrics import accuracy_score

@torch.no_grad()
def evaluating(model, dataloader, loss_fct):
loss_list = []
pred_list = []
label_list = []
for datas, labels in dataloader:
datas = datas.to(device)
labels = labels.to(device)
# 前向计算
logits = model(datas)
loss = loss_fct(logits, labels) # 验证集损失
loss_list.append(loss.item())
# 二分类
preds = logits > 0
pred_list.extend(preds.cpu().numpy().tolist())
label_list.extend(labels.cpu().numpy().tolist())

acc = accuracy_score(label_list, pred_list)
return np.mean(loss_list), acc

# 训练
def training(
model,
train_loader,
val_loader,
epoch,
loss_fct,
optimizer,
tensorboard_callback=None,
save_ckpt_callback=None,
early_stop_callback=None,
eval_step=500,
):
record_dict = {
"train": [],
"val": []
}

global_step = 0
model.train()
with tqdm(total=epoch * len(train_loader)) as pbar:
for epoch_id in range(epoch):
# training
for datas, labels in train_loader:
datas = datas.to(device)
labels = labels.to(device)
# 梯度清空
optimizer.zero_grad()
# 模型前向计算
logits = model(datas)
# 计算损失
loss = loss_fct(logits, labels)
# 梯度回传
loss.backward()
# 调整优化器,包括学习率的变动等
optimizer.step()
preds = logits > 0

acc = accuracy_score(labels.cpu().numpy(), preds.cpu().numpy())
loss = loss.cpu().item()
# record

record_dict["train"].append({
"loss": loss, "acc": acc, "step": global_step
})

# evaluating
if global_step % eval_step == 0:
model.eval()
val_loss, val_acc = evaluating(model, val_loader, loss_fct)
record_dict["val"].append({
"loss": val_loss, "acc": val_acc, "step": global_step
})
model.train()

# 1. 使用 tensorboard 可视化
if tensorboard_callback is not None:
tensorboard_callback(
global_step,
loss=loss, val_loss=val_loss,
acc=acc, val_acc=val_acc,
lr=optimizer.param_groups[0]["lr"],
)

# 2. 保存模型权重 save model checkpoint
if save_ckpt_callback is not None:
save_ckpt_callback(global_step, model.state_dict(), metric=val_acc)

# 3. 早停 Early Stop
if early_stop_callback is not None:
early_stop_callback(val_acc)
if early_stop_callback.early_stop:
print(f"Early stop at epoch {epoch_id} / global_step {global_step}")
return record_dict

# udate step
global_step += 1
pbar.update(1)
pbar.set_postfix({"epoch": epoch_id})

return record_dict


epoch = 20

model = LSTM()

# 1. 定义损失函数 采用交叉熵损失 (但是二分类)
loss_fct = F.binary_cross_entropy_with_logits
# 2. 定义优化器 采用 adam
# Optimizers specified in the torch.optim package
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 1. tensorboard 可视化
if not os.path.exists("runs"):
os.mkdir("runs")
tensorboard_callback = TensorBoardCallback("runs/imdb-lstm")
# tensorboard_callback.draw_model(model, [1, MAX_LENGTH])
# 2. save best
if not os.path.exists("checkpoints"):
os.makedirs("checkpoints")
save_ckpt_callback = SaveCheckpointsCallback("checkpoints/imdb-lstm", save_step=len(train_dl), save_best_only=True)
# 3. early stop
early_stop_callback = EarlyStopCallback(patience=10)

model = model.to(device)
record = training(
model,
train_dl,
test_dl,
epoch,
loss_fct,
optimizer,
tensorboard_callback=tensorboard_callback,
save_ckpt_callback=save_ckpt_callback,
early_stop_callback=early_stop_callback,
eval_step=len(train_dl)
)