Word2Vec 什么是 Word2vec? NLP 里面,最细粒度的是 词语,词语组成句子,句子再组成段落、篇章、文档。所以处理 NLP 的问题,首先就要拿词语开刀。
NLP 里的词语,是人类的抽象总结,是符号形式的(比如中文、英文、拉丁文等等),所以需要把他们转换成数值形式,或者说——嵌入到一个数学空间里,这种嵌入方式,就叫词嵌入(word embedding),而 Word2vec,就是词嵌入( word embedding) 的一种
在 NLP 中,把 x 看做一个句子里的一个词语,y 是这个词语的上下文词语,那么这里的 f,便是 NLP 中经常出现的『语言模型』(language model),这个模型的目的,就是判断 (x,y) 这个样本,是否符合自然语言的法则,更通俗点说就是:词语x和词语y放在一起,是不是人话。
Word2vec 正是来源于这个思想,但它的最终目的,不是要把 f 训练得多么完美,而是只关心模型训练完后的副产物——模型参数(这里特指神经网络的权重),并将这些参数,作为输入 x 的某种向量化的表示,这个向量便叫做——词向量
我们来看个例子,如何用 Word2vec 寻找相似词:
对于一句话:『她们 夸 吴彦祖 帅 到 没朋友』,如果输入 x 是『吴彦祖』,那么 y 可以是『她们』、『夸』、『帅』、『没朋友』这些词
现有另一句话:『她们 夸 我 帅 到 没朋友』,如果输入 x 是『我』,那么不难发现,这里的上下文 y 跟上面一句话一样
从而 f(吴彦祖) = f(我) = y,所以大数据告诉我们:我 = 吴彦祖(完美的结论
上面我们提到了语言模型
如果是用一个词语作为输入,来预测它周围的上下文,那这个模型叫做『Skip-gram 模型』
而如果是拿一个词语的上下文作为输入,来预测这个词语本身,则是 『CBOW 模型』
我们先来看个最简单的例子。上面说到, y 是 x 的上下文,所以 y 只取上下文里一个词语的时候,语言模型就变成:
用当前词 x 预测它的下一个词 y
但如上面所说,一般的数学模型只接受数值型输入,这里的 x 该怎么表示呢? 显然不能用 Word2vec,因为这是我们训练完模型的产物,现在我们想要的是 x 的一个原始输入形式。
答案是:one-hot encoder
当模型训练完后,最后得到的其实是神经网络的权重 ,比如现在输入一个 x 的 one-hot encoder: [1,0,0,…,0],对应刚说的那个词语『吴彦祖』,则在输入层到隐含层的权重里,只有对应 1 这个位置的权重被激活,这些权重的个数,跟隐含层节点数是一致的,从而这些权重组成一个向量 vx 来表示x,而因为每个词语的 one-hot encoder 里面 1 的位置是不同的,所以,这个向量 vx 就可以用来唯一表示 x。
Skip-gram 更一般的情形 一对多
CBOW 更一般的情形 更为常用, 跟 Skip-gram 相似,只不过:
Skip-gram 是预测一个词的上下文,而 CBOW 是用上下文预测这个词
多对一
网络结构如下:
更 Skip-gram 的模型并联不同,这里是输入变成了多个单词,所以要对输入处理下(一般是求和然后平均),输出的 cost function 不变
最终训练好的N-dim的向量(是一个密集向量)就是表示当前输出词语的词向量。 相当于多维特征去表征一个词的词义,可以发现,相近的词向量之间的距离是对应相等的。比如:国王与皇后的距离与男人与女人的距离是相同的。
Embedding Embedding(词嵌入):word2vec是Embedding的一种,
代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from tensorflow import keras imdb = keras.datasets.imdb vocab_size = 10000 index_from = 3 (train_data, train_labels), (test_data, test_labels) = imdb.load_data( num_words=vocab_size, index_from=index_from) word_index = imdb.get_word_index() print(len (word_index)) print(type (word_index))
构造 word2idx 和 idx2word 1 2 3 4 5 6 7 8 9 word2idx = {word: idx + 3 for word, idx in word_index.items()} word2idx.update({ "[PAD]" : 0 , "[BOS]" : 1 , "[UNK]" : 2 , "[EOS]" : 3 , }) idx2word = {idx: word for word, idx in word2idx.items()}
Tokenizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 class Tokenizer : def __init__ (self, word2idx, idx2word, max_length=500 , pad_idx=0 , bos_idx=1 , eos_idx=3 , unk_idx=2 ): self.word2idx = word2idx self.idx2word = idx2word self.max_length = max_length self.pad_idx = pad_idx self.bos_idx = bos_idx self.eos_idx = eos_idx self.unk_idx = unk_idx def encode (self, text_list ): """ 将文本列表转化为索引列表 :param text_list:当前批次的文本列表 :return: """ max_length = min (self.max_length, 2 + max ( [len (text) for text in text_list])) indices = [] for text in text_list: index = [self.word2idx.get(word, self.unk_idx) for word in text] index = [self.bos_idx] + index + [self.eos_idx] if len (index) < max_length: index = index + [self.pad_idx] * (max_length - len (index)) else : index = index[:max_length] indices.append(index) return torch.tensor(indices) def decode (self, indices_list, remove_bos=True , remove_eos=True , remove_pad=True , split=False ): """ 将索引列表转化为文本列表 :param indices_list:某批次的索引列表 :param remove_bos: :param remove_eos: :param remove_pad: :param split: :return: """ text_list = [] for indices in indices_list: text = [] for index in indices: word = self.idx2word.get(index, "[UNK]" ) if remove_bos and word == "[BOS]" : continue if remove_eos and word == "[EOS]" : break if remove_pad and word == "[PAD]" : break text.append(word) text_list.append(" " .join(text) if not split else text) return text_list tokenizer = Tokenizer(word2idx=word2idx, idx2word=idx2word) raw_text = ["hello world" .split(), "tokenize text datas with batch" .split(), "this is a test" .split()] indices = tokenizer.encode(raw_text) decode_text = tokenizer.decode(indices.tolist(), remove_bos=False , remove_eos=False , remove_pad=False ) print("raw text" ) for raw in raw_text: print(raw) print("indices" ) for index in indices: print(index) print("decode text" ) for decode in decode_text: print(decode)
对应效果:
数据集与 DataLoader 由于是tf的数据集,dataset需要重写:
并且注意:由于刚导入train_data是包含ids的一系列数据(其中包含词表大小和get_word_index()),每一个样本是不包含结束符的,所以需要先进行decode然后encode为其补上结束符。并且需要对batch进行encode,统一长度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from torch.utils.data import Dataset, DataLoaderclass IMDBDataset (Dataset ): def __init__ (self, data, labels, remain_length=True ): if remain_length: self.data = tokenizer.decode(data, remove_bos=False , remove_eos=False , remove_pad=False ) else : self.data = tokenizer.decode(data) self.labels = labels def __getitem__ (self, index ): text = self.data[index] label = self.labels[index] return text, label def __len__ (self ): return len (self.data) def collate_fct (batch ): """ 回调函数: 将batch(一批)数据处理成tensor形式 :param batch: :return: """ text_list = [item[0 ].split() for item in batch] label_list = [item[1 ] for item in batch] text_list = tokenizer.encode(text_list).to(dtype=torch.int ) return text_list, torch.tensor(label_list).reshape(-1 , 1 ).to(dtype=torch.float ) train_ds = IMDBDataset(train_data, train_labels) test_ds = IMDBDataset(test_data, test_labels) batch_size = 128 train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True , collate_fn=collate_fct) test_dl = DataLoader(test_ds, batch_size=batch_size, shuffle=False , collate_fn=collate_fct)
此时的train_dl为:[128,500],为了进一步将其转变为密集向量,将对其转换为one-hot编码:[128,500,10000],相当于将原本的每条数据中的每个id改为对应的one-hot编码(10000为vocab_size)
选择10000*16的矩阵(需要训练,包含权值w),这样通过矩阵相乘,最后可以得到500x16的矩阵,这样500个词每一个词都又一个16的密集向量所表示。
定义模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 m = nn.AdaptiveAvgPool1d(1 ) input = torch.randn(1 , 3 , 6 )output = m(input ) output.size() class AddingModel (nn.Module ): def __init__ (self, embedding_dim=16 , hidden_dim=64 , vocab_size=vocab_size ): super (AddingModel, self).__init__() self.embeding = nn.Embedding(vocab_size, embedding_dim) self.pool = nn.AdaptiveAvgPool1d(1 ) self.layer = nn.Linear(embedding_dim, hidden_dim) self.fc = nn.Linear(hidden_dim, 1 ) def forward (self, x ): x = self.embeding(x) x = x.permute(0 , 2 , 1 ) x = self.pool(x) x=x.squeeze(2 ) x = self.layer(x) x = self.fc(x) return x for key, value in AddingModel().named_parameters(): print(f"{key:^40 } paramerters num: {np.prod(value.shape)} " )
训练
损失函数使用的是二进制交叉熵损失 :torch.nn.functional.binary_cross_entropy_with_logits, 先sigmoid再计算交叉熵
二进制交叉熵损失 又叫对数似然损失:用于使用一个神经元,做二分类,也即逻辑回归所使用的分类。
而多个神经元做多分类的时候,则使用的是交叉熵损失(只有前面一半)
binary_cross_entropy_with_logits与.binary_cross_entropy的区别:在于with_logits会先进行sigmoid,再计算交叉熵。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 def training ( model, train_loader, val_loader, epoch, loss_fct, optimizer, tensorboard_callback=None , save_ckpt_callback=None , early_stop_callback=None , eval_step=500 , ): record_dict = { "train" : [], "val" : [] } global_step = 0 model.train() with tqdm(total=epoch * len (train_loader)) as pbar: for epoch_id in range (epoch): for datas, labels in train_loader: datas = datas.to(device) labels = labels.to(device) optimizer.zero_grad() logits = model(datas) loss = loss_fct(logits, labels) loss.backward() optimizer.step() preds = logits > 0 acc = accuracy_score(labels.cpu().numpy(), preds.cpu().numpy()) loss = loss.cpu().item() record_dict["train" ].append({ "loss" : loss, "acc" : acc, "step" : global_step }) if global_step % eval_step == 0 : model.eval () val_loss, val_acc = evaluating(model, val_loader, loss_fct) record_dict["val" ].append({ "loss" : val_loss, "acc" : val_acc, "step" : global_step }) model.train() if tensorboard_callback is not None : tensorboard_callback( global_step, loss=loss, val_loss=val_loss, acc=acc, val_acc=val_acc, lr=optimizer.param_groups[0 ]["lr" ], ) if save_ckpt_callback is not None : save_ckpt_callback(global_step, model.state_dict(), metric=val_acc) if early_stop_callback is not None : early_stop_callback(val_acc) if early_stop_callback.early_stop: print(f"Early stop at epoch {epoch_id} / global_step {global_step} " ) return record_dict global_step += 1 pbar.update(1 ) pbar.set_postfix({"epoch" : epoch_id}) return record_dict epoch = 20 model = AddingModel() loss_fct = F.binary_cross_entropy_with_logits optimizer = torch.optim.Adam(model.parameters(), lr=0.001 ) if not os.path.exists("runs" ): os.mkdir("runs" ) tensorboard_callback = TensorBoardCallback("runs/imdb-adding" ) if not os.path.exists("checkpoints" ): os.makedirs("checkpoints" ) save_ckpt_callback = SaveCheckpointsCallback("checkpoints/imdb-adding" , save_step=len (train_dl), save_best_only=True ) early_stop_callback = EarlyStopCallback(patience=5 ) model = model.to(device) record = training( model, train_dl, test_dl, epoch, loss_fct, optimizer, tensorboard_callback=tensorboard_callback, save_ckpt_callback=save_ckpt_callback, early_stop_callback=early_stop_callback, eval_step=len (train_dl) )