深度学习之Skip-Gram和CBOW原理

Word2vec

1. 独热编码(one-hot recording)

例如词典\(V = (apple, going, I, home, machine, learning)\)的独热编码如下:

\(\begin{aligned} & apple = (1, 0, 0, 0, 0, 0) \\ & machine = (0, 0, 0, 0, 1, 0) \\ & learning = (0, 0, 0, 0, 0, 1) \\ & I, Going, Home = (0, 1, 1, 1, 0, 0) \end{aligned}\)

稀疏表示的缺点:

  1. 稀疏性
  2. 无法表示单词的相似度(任何两个单词的内积都为0),
  3. 表达的能力弱

2. 分布式表示方法

分布式的表示方法是机器学习的核心

\(\begin{aligned} & apple = (0.1, 0.3, 0.5, 0.1) \\ & machine = (0.2, 0.3, 0.1, 0.6) \\ & learning = (0.1, 0.2, 0.6, 0.1) \\ & I, Going, Home = (0.5, 1.1, 0.5, 0.2) \end{aligned}\)

分布式表示的优点:

  1. 分布式表示是可以表示单词之间的相关性 (semantic);
  2. 表达能力比较强(Dense Meaning).因为稠密表示的方式可以表示无穷多个单词;
  3. 泛化能力强(global representation).

不过分布式表示不能直接统计得到,需要学习算法

3 How to Learn Word2Vec

我们希望可以达到的目标:具有相似度高的单词聚类在一起。

Motivation: 单词离得越近相似度越大

CBow Model:通过周围的单词预测中间的单词

Skip-Gram:通过当前的单词预测周围的单词

下面我们以skip-gram为例,CBOW推理相似

具体周围的单词取几个是超参数\(window_size\)

\(sentense = (v_1, v_2, v_3, v_4, v_5, v_6)\)

假如我们以且\(v_3\)以中间词,且\(window_size=2\),我们想最大化:

\(q(v_3) = p(v_1|v_3)p(v_2|v_3)p(v_4|v_3)p(v_5|v_3)\)

对于以上句子,其实我们想要最大化的就是:

\(\begin{aligned}\arg \max_\theta = [latex]q(v_1)*[latex]q(v_2) * [latex]q(v_3) * [latex]q(v_4) * [latex]q(v_5) * [latex]q(v_6) \end{aligned}\)

也可以表示为:

\(\arg \max_\theta = \prod_{v \in sentense} \prod_{c \in neb(w)} p(c|w;[latex]\theta)\)

\(w\)是中心词,\(c\)是中心词周围的单词。

对上式取对数可得:

\(\arg \max_\theta = \sum_{v \in sentense} \sum_{c \in neb(w)} \log p(c|w;[latex]\theta)\)

其中模型参数\(\theta\)可以表示为:

\(\theta = [U, V]\)

\(U\)是一个二维矩阵\(N?K\),\(N\)表示词典单词个数,\(K\)是代表每个单词的向量,\(U\)的表示和\(V\)是一致的。其中,\(V\)是为了表示中心词,\(U\)表示上下文单词。

\((c, w)\)出现在一起时,\(p(c|w;\theta)\)越大

\(p(c|w;\theta)= \frac {\exp(U_c*V_w)} {\sum_{c’} \exp(U_{c’}*V_w)}\)

\(c′\)表示词库中的所有单词。

所以我们最后求取的是:

\(\arg \max_\theta = \sum_{v \in sentense} \sum_{c \in neb(w)} [U_c*V_w – \log \sum_{c’} \exp(U_{c’}*V_w)]\)

如果对上式直接进行SGD求解,复杂度太多,可以用下面的方法优化求解:

  1. Negative Sampling
  2. Hierarchical Softmax

4. Skip-Gram的目标函数

也可以用另一种形式表示目标函数,同样的用上面的例子:

\(sentense = (v_1, v_2, v_3, v_4, v_5, v_6)\)

假如\(window_size=2\),我们可以表示为:

\(p(v_2, v_3) = \frac {1} {1 + \exp (-U_{v_2} * V_{v_3})} = 1\)

\(p(v_2, v_3) =1- \frac {1} {1 + \exp (-U_{v_2} * V_{v_3})} = 0\)

所以目标函数可以写为:

\(\arg \max_\theta \prod_{w, c \in D} p(y=1|w, c;\theta) \prod_{w, c \in \hat D} p(y=0|w, c;\theta)\)

\(w\)是中心词,\(c\)是上下文,\(D\)表示词对符合上下文,是正样本空间,\(\hat D\)表示词对不是上下文关系,是负样本。

目标函数可以写为:

\(\arg \max_\theta \prod_{w, c \in D} \frac {1} {1 + \exp(-U_c * V_w)} \prod_{w, c \in \hat D} [1 – \frac {1} {1 + \exp(-U_c * V_w)}]\)

对数后:

\(\arg \max_\theta \sum_{w, c \in D} \log \frac {1} {1 + \exp(-U_c * V_w)} +\sum_{w, c \in \hat D} \log [1 – \frac {1} {1 + \exp(-U_c * V_w)}]\)

可以看出\(w, c \in \hat D\)的样本量太大了,所以负样本进行降采样(Negative Sampling)。

目标函数可以表示为:

\(\arg \max_\theta \sum_{w, c \in D} \log \sigma(U_c * V_w) + \sum_{c’ \notin neb(x)} \log \sigma(-U_{c’} * V_w)\)

tips:\(e^{-x} / (1 + e^{-x}) = 1/(1+e^x)\)

对参数进行求导:

\(\frac {\partial l(\theta)} {\partial U_c} = \frac {\sigma(U_c*V_w)[1-\sigma(U_c*V_w)] * V_w} {\sigma(U_c*V_w)} = [1-\sigma(U_c*V_w)] * V_w\)

同样可得:

\(\frac {\partial l(\theta)} {\partial U_{c’}} = -[1-\sigma(U_{c’}*V_w)] * V_w\)

\(\frac {\partial l(\theta)} {\partial V_w} = [1-\sigma(U_c*V_w)] * U_c – \sum_{c’ \notin neb(x)}[1-\sigma(U_{c’}*V_w)] * U_{c’}\)

接下来更新梯度就好了

5. 词向量的评估

  1. 将词向量降维到二维空间中(TSNE),可视化观察特点,比如是否相似的单词比较接近;
  2. 可以抽样计算相似度,比如余弦相似度;
  3. Analogy类比方式,比如woman:man, girl:? 看看?和boy的距离

6. Skip-Gram 的缺点

  1. 没有考虑到上下文;
  2. 窗口长度有限,无法考虑全局;
  3. 无法有效学习低频词和未登录词OOV(out of vocabulary)
  4. 语序问题;
  5. 多义词无法区别。

上下文的问题可以用Elmo和Bert解决;低频次和OOV可以用subword embedding

7. pytorch实现

import numpy as np
import torch
from torch import nn, optim
import random
from collections import Counter
 
 
"""
- 数据预处理
- 构建损失器以及网络
- 模型训练
"""
 
 
"""
读取文件
"""
def get_data(file_name):
    with open(file_name) as f:
        text = f.read()
    return text
 
text = get_data('test.txt')
print(text)
 
 
"""
预处理
"""
def preprocess(text, freq=0):
    # 变为小写
    text = text.lower()
 
    # 对特殊符号进行处理
    text = text.replace('.', '')  # 还可以补充其它特殊符号
 
    # 英文的话分词
    words = text.split()
 
    # 对单词进行统计
    word_count = Counter(words)
 
    # 去除低于阈值的单词
    trimmed_words = [word for word in words if word_count[word] > freq]
 
    return trimmed_words
 
 
"""
准备工作:辞典,embedding, 准备训练文本
"""
def prepair_train_data(text):
    words = preprocess(text)
    vocab = set(words)
    # 单词==》索引
    vocab2index = {w: c for c, w in enumerate(vocab)}
    # 索引 ==》 单词
    index2vocab = {c: w for c, w in enumerate(vocab)}
 
    # 将文本的所有单词转化为索引
    index_words = [vocab2index[w] for w in words]
 
    index_word_counts = Counter(index_words)
    # 单词总数
    total_count = len(index_words)
    # 每个单词的占比
    word_freqs = {w: c/total_count for w, c in index_word_counts.items()}
 
    # 计算删除单词的概率
    t = 1e-5
    prob_drop = {w: 1 - np.sqrt(t/ word_freqs[w]) for w in index_word_counts}
    # 保留的单词
    # train_words = [w for w in index_words if random.random()<(1-prob_drop[w])] train_words = index_words return train_words, index2vocab, vocab2index, word_freqs """ 计算单词概率分布 """ def cal_distribution(word_freqs): # todo: 先对word排序,之后再计算 word_freqs = np.array(list(word_freqs.values())) unigram_dist = word_freqs / word_freqs.sum() noise_dist = torch.from_numpy(unigram_dist ** 0.75 / np.sum(unigram_dist ** 0.75)) return noise_dist """ 获取周边词/target """ def get_target(words, idx, [latex]window_size=5): # 窗口大小随机调整 target_[latex]window = np.random.randint(2, [latex]window_size+1) # 初始下表 start_index = idx - target_[latex]window if (idx - target_[latex]window) > 0 else 0
 
    # 结束下表
    end_point = idx + target_[latex]window
 
    # 获取单词
    targets = set(words[start_index: idx] + words[idx + 1: end_point + 1])
 
    return list(targets)
 
 
"""
batch迭代器
"""
def get_batch(words, batch_size, [latex]window_size):
    # 看单词可以分为多少个batch
    n_bathches = len(words) // batch_size
 
    # 将单词数目规整为可以batch_size整初的
    words = words[: n_bathches * batch_size]
 
    for idx in range(0, len(words), batch_size):
        batch_x, batch_y = [], []
        # 先获取一个batch的单词
        batch = words[idx: idx + batch_size]
        # 在一个batch中依次以每个词为中心词获取周边次
        for i in range(len(batch)):
            x = batch[i]
            y = get_target(batch, i, [latex]window_size)
            # 为了使得x和y长度相等
            batch_x.extend([x] * len(y))
            batch_y.extend(y)
        yield batch_x, batch_y
 
 
"""
构造网络结构
"""
class SkipGramNeg(nn.Module):
    def __init__(self, n_vocab, n_embed, noise_dist=None):
        """
        :param n_vocab: 单词个数
        :param n_embed: embedding 个数
        :param noise_dist: noise distribution 为了负采样
        """
        super().__init__()
 
        self.n_vocab = n_vocab
        self.n_embed = n_embed
        self.noise_dist = noise_dist
 
        # 定义输入层和输出层的嵌入
        self.in_embed = nn.Embedding(n_vocab, n_embed)
        self.out_embed = nn.Embedding(n_vocab, n_embed)
 
        # 初始话参数,为了更好的收敛
        self.in_embed.weight.data.uniform_(-1, 1)
        self.out_embed.weight.data.uniform_(-1, 1)
 
    def forward_input(self, input_words):
        # 输入层进行embedding
        input_vectors = self.in_embed(input_words)
        return input_vectors
 
    def forward_output(self, output_words):
        # 输出层
        output_vectors = self.out_embed(output_words)
        return output_vectors
 
    def forward_noise(self, batch_size, n_sample):
        """生成noise vectors, shape(batch_size, n_samples, n_embed)"""
        if self.noise_dist is None:
            # 均匀采样 sample words uniformly
            noise_dist = torch.ones(self.n_vocab)
        else:
            noise_dist = self.noise_dist
 
        # 通过多项式采样
        noise_words = torch.multinomial(noise_dist,
                                        batch_size * n_sample,
                                        replacement=True)
 
        noise_vect = self.out_embed(noise_words).view(batch_size, n_sample, self.n_embed)
 
        return noise_vect
 
 
"""
构造损失函数
"""
class NegativeSamplingLoss(nn.Module):
    def __init__(self):
        super(NegativeSamplingLoss, self).__init__()
 
    def forward(self, input_vector, output_vector, noise_vectors):
        batch_size, embed_size = input_vector.shape
 
        # 对输入和输出数据整型
        input_vector = input_vector.view(batch_size, embed_size, 1)
        output_vector = output_vector.view(batch_size, 1, embed_size)
 
        # bmm = batch matrix multiplication
        # correct log-sigmoid loss
        out_loss = torch.bmm(output_vector, input_vector).sigmoid().log()
        out_loss = out_loss.squeeze()
 
        # 负采样的log-sigmoid loss
        noise_loss = torch.bmm(noise_vectors.neg(), input_vector).sigmoid().log()
        noise_loss = noise_loss.squeeze().sum(1)
 
        return -(out_loss + noise_loss).mean()
 
 
"""
训练模型
"""
def train_model():
    train_words, index2vocab, vocab2index, word_freqs = prepair_train_data(text)
    noise_dist = cal_distribution(word_freqs)
 
    # 初始化模型
    embedding_dim = 300
    model = SkipGramNeg(len(vocab2index), embedding_dim, noise_dist)
 
    # 定义损失函数和优化器
    criterion = NegativeSamplingLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.003)
 
    print_every = 1
    steps = 0
    epoch = 5
    batch_size = 50
    n_samples = 5
 
    for e in range(epoch):
        for input_words, target_words in get_batch(train_words, batch_size, [latex]window_size=5):
            steps += 1
            inputs, targets = torch.LongTensor(input_words), torch.LongTensor(target_words)
            input_vectors = model.forward_input(inputs)
            output_vectors = model.forward_output(targets)
            current_batch_size = inputs.__len__()
            noise_vectors = model.forward_noise(current_batch_size, n_samples)
 
 
            loss = criterion(input_vectors, output_vectors, noise_vectors)
 
            if steps % print_every == 0:
                print('loss', loss)
 
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
 
 
if __name__ == '__main__':
    train_model()

Follow me!

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注