词向量表示

调包实现word2vec

实验介绍

[!question] 借助gensim工具包,利用给定的训练语料训练word2vec(CBOW)
训练后实现:
(1)自选5对词语相似度计算
(2)任选5个词,找出与他们最接近的5个语义相似的词
(3)计算爸爸-男人+女人=?

词的表示方法:
由于机器学习方法通常只能接受向量作为输入,因此在NLP中使用机器学习方法时,往往需要将词表示为向量。向量表示更为规范,通常由固定的维度,并且易于进行机器学习算法中的各类运算

  • 独热编码:最简单的方式,但是会带来维度灾难,且无法表示词与词之间的相关性
  • 分布式表示
    • 稀疏向量表示:词-词共现矩阵
    • 稠密向量表示:传统方法是基于SVD的潜在语义分析,近期方法有word2vec和Glove,现在常用上下文相关词嵌入

实验1只需要调用gensim模块实现即可,该模块封装了Word2Vec模型,可以自由选择调用skip-gram和CBOW,同时还有其他参数可供调节

image.png

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from gensim.models import Word2Vec
from gensim.utils import simple_preprocess

# 读入文件与预处理
with open('homework3_input.txt', encoding = 'utf-8') as f:
text = f.read()

sentences = text.split('\n')

corpus = [sentence.split(' ') for sentence in sentences]
# print(corpus)
# print(type(corpus))

# 传入的是二维列表!
# corpus = [simple_preprocess(sentence) for sentence in sentences]
# print(corpus)

# 模型训练
model = Word2Vec(sentences=corpus, vector_size=100, window=5, min_count=1, workers=4, epochs=5, min_alpha=0.005, sg=0)

# 模型保存
model.save("word2vec.model")

# 模型加载
model = Word2Vec.load("word2vec.model")

# 相似度计算函数
def similarity_compute(word1, word2):
similarity = model.wv.similarity(word1, word2)
print(f"{word1}{word2} 相似度为:{similarity}")

# 找到前5个相近词
def most_similar(word):
simi_words = model.wv.most_similar(word, topn=5)
print(f"{word}的前五个相近词:")
for ans, score in simi_words:
print(f"{ans} : {score}")

# 计算5组词之间的相似度
similarity_compute('俄罗斯', '乌克兰')
similarity_compute('舅舅', '叔叔')
similarity_compute('证券', '金融')
similarity_compute('足球', '篮球')
similarity_compute('火锅', '麻辣烫')

# 寻找相近词
most_similar('足球')
most_similar('新华社')
most_similar('舅舅')
most_similar('欧洲')
most_similar('心脏')

# 计算'爸爸-男人+女人'的结果
result = model.wv.most_similar(positive=['爸爸', '女人'], negative=['男人'], topn=5)
for word, score in result:
print(f"可能的答案: {word}, 相似度:{score}")

注意事项

  • 读取文件时,首先要按照换行符切割成每行单独的列表,然后再按照空格对每行分别处理,形成每行按照词进行分割的二维列表
  • 这里也可以使用 gensim 提供的处理函数 simple_preprocess 函数,效果和按照空格切割类似,只是还会舍弃一些停用词
  • 训练模型时如果觉得效果不好,可以尝试修改超参数

实验结果

image.png

句子的向量表示

实验介绍

[!question] 利用transformers中的Bert和GPT2包,任选5对中文与英文句子计算相似度

句子同样也可以像词一样变成向量表示,通过bert、gpt等预训练模型,可将输入的句子转化为向量,从而具有可运算的性质

BERT和GPT是两种基于Transformer架构的预训练模型,BERT侧重于理解句⼦中的上下⽂和含义,适合词语级别的任务;⽽GPT则专注于⽣成连贯的⽂本,适⽤于⽣成式任务。两者在训练⽅式、任务⽬标和适⽤场景上有所不同,BERT使⽤掩码语⾔模型和下⼀句预测,GPT采⽤⾃回归语⾔模型。

注意:Bert和GPT2都是动态词向量(上下文嵌入),每个词的向量表示不是固定的,而是根据上下文变动

⽂本相似度计算思路

⾸先使⽤BERT的分词器对输⼊的单词进⾏编码,然后将编码后的数据输⼊到BERT模型中获取嵌⼊向量。随后,对这些向量进⾏平均池化处理以获得更加稳定的特征表⽰,最后通过余弦相似度函数计算两个嵌⼊向量之间的相似度。这种⽅法结合了BERT的深层语义理解能⼒和余弦相似度的直观度量⽅式,能有效地评估两个⽂本之间的语义接近程度。

⽂本编码

在BERT中,⽂本⾸先通过⼀个分词器(Tokenizer)处理,该分词器将原始输⼊⽂本转换为模型可以理解的格式,包括将单词转换为词汇表中的索引、添加特殊的分隔符(如[CLS]和[SEP]),以及⽣成对应的注意⼒掩码(Attention Mask)。这⼀步是处理⽂本数据的关键,它直接关系到后续模型能否正确理解和处理输⼊数据。

嵌⼊向量的获取

通过BERT模型对编码后的⽂本进⾏处理后,可以获取到每个输⼊token的嵌⼊表⽰。这些表⽰是在模型的多层⽹络结构中⽣成的,其中每⼀层都通过⾃注意⼒机制前馈⽹络计算得到新的表⽰。在⾃然语⾔处理任务中,通常使⽤模型最后⼀层的输出作为最终的特征表⽰,因为它们包含了经过多层处理后的⾼级语义信息。

余弦相似度的计算

余弦相似度是⼀种常⽤的相似度度量⽅式,它通过计算两个向量的夹⻆余弦值来评估它们的相似度。在⽂本处理中,将两个⽂本的嵌⼊向量进⾏余弦相似度计算,可以得到⼀个介于-1和1之间的标量值,表⽰这两个⽂本在语义上的接近程度。值越接近1,表⽰语义相似度越⾼;值越接近-1,表⽰语义差异越⼤。

代码

Bert 中文相似度

  • model.to(device) 将模型移动到指定的设备(GPU 或 CPU)
  • model.eval() 将模型设置为评估模式(关闭 dropout 等训练时的特殊操作)
  • **tokenizers(sentence, return_tensors='pt', truncation=True, max_length=128)**:
    • 将输入的句子分词,并转换为模型可以处理的张量格式。
    • return_tensors='pt':返回 PyTorch 张量。
    • truncation=True:如果句子长度超过 max_length,则截断。
    • max_length=128:设置句子的最大长度为 128。
  • **inputs = {k: v.to(device) for k,v in inputs.items()}**:
    • 将输入数据移动到指定的设备(GPU 或 CPU)。
  • **with torch.no_grad():**:
    • 禁用梯度计算,因为在评估模式下不需要计算梯度。
  • **outputs = model(**inputs)**:
    • 将输入数据传递给 Bert/GPT-2 模型,得到输出。
  • **cls_embedding = outputs.last_hidden_state[:, 0, :]**:
    • 提取模型输出的最后一层隐藏状态(last_hidden_state),并取第一个 token([:, 0, :])作为句子的嵌入表示。
    • 这里假设第一个 token 是句子的整体表示(类似于 BERT 的 [CLS] token)。
  • **return cls_embedding.cpu().numpy()**:
    • 将句子的嵌入表示从 GPU 移动到 CPU,并转换为 NumPy 数组。
  • cosine_similarity 返回(1,1)的矩阵,需要用 [1][1] 进行提取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import torch
from transformers import BertTokenizer
from transformers import BertModel
from sklearn.metrics.pairwise import cosine_similarity

# 中文Bert
model_name = '../第二周/bert-base-chinese/bert-base-chinese'
tokenizers = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 这句话不要忘了是字符串...

model.to(device) # 将模型移动到指定的设备(GPU 或 CPU)
model.eval() # 将模型设置为评估模式(关闭 dropout 等训练时的特殊操作)

def get_sentence_embedding(sentence):
inputs = tokenizers(sentence, return_tensors='pt', truncation=True, max_length=128) #
inputs = {k: v.to(device) for k,v in inputs.items()} # 将输入数据移动到指定的设备(GPU 或 CPU)

with torch.no_grad():
outputs = model(**inputs) # 函数解包,将inputs传给模型,得到输出

cls_embedding = outputs.last_hidden_state[:, 0, :] # 取出[CLS]对应的向量作为句子的embedding
return cls_embedding.cpu().numpy() # 将embedding从GPU转回CPU,并转为numpy数组返回

def compute_sentence_similarity(sentence1, sentence2):
embedding1 = get_sentence_embedding(sentence1) # 这里的embedding1就是句子1的embedding
embedding2 = get_sentence_embedding(sentence2) # 这里的embedding2就是句子2的embedding

sim = cosine_similarity(embedding1, embedding2)[0][0] # 会返回如$$[0.9926]] 这样的(1,1)矩阵结果,需要用[0][0]进行提取
print(f"句子相似度: {sim:.4f}")

sentence1 = "中国队大胜尼日利亚队"
sentence2 = "尼日利亚队大败中国队"
sentence3 = '吃葡萄不吐葡萄皮'
sentence4 = '不吃葡萄倒吐葡萄皮'
sentence5 = '夏洛特烦恼'
sentence6 = '西红市首富赚了100万,于是开了开心麻花'
sentence7 = '他正在学习自然语言处理'
sentence8 = '他正在学习人工智能技术'
sentence9 = '这部电影非常精彩,值得一看'
sentence10 = '这部电影太无聊了,浪费时间'

compute_sentence_similarity(sentence1, sentence2)
compute_sentence_similarity(sentence3, sentence4)
compute_sentence_similarity(sentence5, sentence6)
compute_sentence_similarity(sentence7, sentence8)
compute_sentence_similarity(sentence9, sentence10)

image.png

Bert 英文相似度

没有多少区别,只需要把model_name更换,加载英文模型即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 英文Bert
model_name = '../第二周/bert-base-cased/bert-base-cased'
tokenizers = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)

'''中间省略'''

sentence1 = "The weather is nice today, and the sun is shining."
sentence2 = "I love you."
sentence3 = "I enjoy eating apples and bananas."
sentence4 = "Apples and bananas are my favorite fruits."
sentence5 = "He is studying natural language processing."
sentence6 = "The cat is sleeping."
sentence7 = "This movie is fantastic and worth watching."
sentence8 = "That bookis so boring and a waste of time."
sentence9 = "She went to Beijing yesterday."
sentence10 = "He has married a beautiful girl."

image.png

GPT2 英文相似度

只需要把Bert的加载模型换成以下即可:

1
2
3
4
5
6
7
8
import torch
from transformers import GPT2Tokenizer
from transformers import GPT2Model
from sklearn.metrics.pairwise import cosine_similarity

model_name = './gpt2'
tokenizers = GPT2Tokenizer.from_pretrained(model_name)
model = GPT2Model.from_pretrained(model_name)

image.png

不过看英文的相似度的时候,感觉英文句子之间的相似度似乎都很高。。。

注意 **input 的作用

在 Python 中,**inputs 是一种特殊的语法,用于将字典解包为关键字参数(keyword arguments)。具体来说,它会将字典中的键值对解包为函数的命名参数。

  1. **inputs 的作用

假设 inputs 是一个字典,例如:

1
2
3
4
inputs = {
'input_ids': tensor1,
'attention_mask': tensor2
}

那么 **inputs 会将这个字典解包为:

1
input_ids=tensor1, attention_mask=tensor2
  1. 代码中的具体应用

在代码中:

1
outputs = model(**inputs)
  • model 是 GPT-2 模型,它的 forward 方法需要一些命名参数,例如 input_idsattention_mask
  • inputs 是一个字典,包含了这些参数:
    1
    2
    3
    4
    inputs = {
    'input_ids': tensor1, # 输入的 token IDs
    'attention_mask': tensor2 # 注意力掩码
    }
  • **inputs 会将字典解包为:
    1
    model(input_ids=tensor1, attention_mask=tensor2)
  1. 为什么需要 **inputs
  • GPT-2/Bert 模型的 forward 方法需要明确的命名参数(如 input_idsattention_mask),而不是一个字典。
  • 使用 **inputs 可以方便地将字典解包为命名参数,避免手动写:
1
outputs = model(input_ids=inputs['input_ids'], attention_mask=inputs['attention_mask'])
  1. 示例

假设 inputs 是以下字典:

1
2
3
4
inputs = {
'input_ids': torch.tensor([[1, 2, 3]]),
'attention_mask': torch.tensor([[1, 1, 1]])
}

那么 model(**inputs) 等价于:

1
model(input_ids=torch.tensor([[1, 2, 3]]), attention_mask=torch.tensor([[1, 1, 1]]))

封装 word2vec

[!question] 这个task需要将给的 word2vec.py 拆开封装,至少包含 configs, models, dataloader, trainers, main等部分

为了养成良好的代码管理习惯,封装是非常重要的。
在github上,我们通常会看到代码往往不会都写在一个文件中,而是拆开了多个模块,通过调用接口来实现功能。这样的好处是便于进行修改和debug

深度学习的代码,通常包括几个文件/文件夹:

  • Models:用于存放所有模型代码
  • Algorithm:用于存放所有算法代码
  • Trainers:用于存放训练/测试流程代码
  • Dataloader:用于存放读写数据/数据预处理部分代码
  • Configs:用于存放所有超参数设置部分
  • Checkpoints:用于存放模型参数
  • Main:用于存放主要流程

此外,通常会有utils文件夹和models文件夹用于存放一些工具和模块

代码处理

我将代码重构成了以下文件,使用tree命令查看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
word2vec:.
│ main.py
│ trainers.py
├─data
│ homework3_input.txt
├─models
│ │ cbow.py
│ │ skip_gram.py
│ │ __init__.py
│ └─__pycache__
├─utils
│ │ configs.py
│ │ dataloader.py
│ │ metrics.py
│ │ negative_sampler.py
│ │ __init__.py
│ └─__pycache__
└─__pycache__

models

里面存放cbow和ckip-gram代码,此处与CBOW为例,skip-gram类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import torch
import torch.nn as nn

# 定义CBOW模型
class CBOWModel(nn.Module):
def __init__(self, vocab_size, embedding_dim):
'''参数初始化'''
super(CBOWModel, self).__init__() # 继承父类(nn.Module)的初始化方法
self.vocab_size = vocab_size # 词汇表大小
self.embedding_dim = embedding_dim # 词向量维度
self.input_embeddings = nn.Embedding(vocab_size, embedding_dim) # 输入词向量嵌入层
self.output_embeddings = nn.Embedding(vocab_size, embedding_dim) # 输出词向量嵌入层
self.init_emb() # 初始化词向量

def init_emb(self):
initrange = 0.5 / self.embedding_dim # 随机初始化范围
self.input_embeddings.weight.data.uniform_(-initrange, initrange) # 输入词向量初始化, uniform_()方法用于将权重张量初始化为均匀分布
self.output_embeddings.weight.data.uniform_(-0, 0) # 输出词向量初始化

def forward(self, context_words, target_words, negative_words):
'''
context_words: 上下文词的索引,形状为 (batch_size, context_size)
target_words: 目标词的索引,形状为 (batch_size)
negative_words: 负采样的词索引,形状为 (batch_size, num_negatives)
'''
# 将上下文词通过输入嵌入层转换为向量,得到形状 (batch_size, context_size, embedding_dim)
context_emb = self.input_embeddings(context_words) # (batch, context_size, embed_dim)
# 对上下文词的嵌入取平均,得到单个向量表示整个上下文,形状变为 (batch_size, embedding_dim)
context_emb = torch.mean(context_emb, dim=1) # (batch, embed_dim)
# 目标词和负样本通过输出嵌入层转换为向量,形状分别为 (batch_size, embedding_dim) 和 (batch_size, num_negatives, embedding_dim)。
target_emb = self.output_embeddings(target_words) # (batch, embed_dim)
negative_emb = self.output_embeddings(negative_words) # (batch, neg, embed_dim)

# 计算正样本的损失
# 分数计算:上下文向量与目标词向量的逐元素乘积之和(点积),表示两者的相似性。
positive_score = torch.sum(context_emb * target_emb, dim=1) # (batch)
# 损失转换:通过sigmoid将分数映射到(0,1)区间,取对数并添加极小值(1e-10)防止数值溢出。
positive_loss = torch.log(torch.sigmoid(positive_score) + 1e-10)

# 计算负样本的损失
# 分数计算:context_emb.unsqueeze(2) 扩展维度为 (batch, embed_dim, 1),torch.bmm(批量矩阵乘法)计算每个负样本与上下文向量的点积,得到形状 (batch_size, num_negatives)
negative_score = torch.bmm(negative_emb, context_emb.unsqueeze(2)).squeeze() # (batch, neg)
# 损失转换:对负样本分数取负号(因为希望负样本得分低),通过sigmoid和log计算损失,并对所有负样本求和。
negative_loss = torch.sum(torch.log(torch.sigmoid(-negative_score) + 1e-10), dim=1) # (batch)

loss = -(positive_loss + negative_loss) # (batch),负号是为了最小化loss
return loss.mean() # 平均loss

其中,里面的forward方法需要注意一下,以CBOW为例:
输入参数

  • context_words: 上下文词的索引,形状为 (batch_size, context_size)
  • target_words: 目标词的索引,形状为 (batch_size)
  • negative_words: 负采样的词索引,形状为 (batch_size, num_negatives)

步骤1:嵌入层操作

  1. 上下文嵌入

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
       context_emb = self.input_embeddings(context_words)  # (batch, context_size, embed_dim)
    ```

    - 将上下文词通过输入嵌入层转换为向量,得到形状 `(batch_size, context_size, embedding_dim)`。
    - **取平均**:

    ```python
    context_emb = torch.mean(context_emb, dim=1) # (batch, embed_dim)
    ```

    - 对上下文词的嵌入取平均,得到单个向量表示整个上下文,形状变为 `(batch_size, embedding_dim)`。

    2. **目标词和负样本嵌入**

    ```python
    target_emb = self.output_embeddings(target_words) # (batch, embed_dim)
    negative_emb = self.output_embeddings(negative_words) # (batch, neg, embed_dim)
    ```

    - 目标词和负样本通过输出嵌入层转换为向量,形状分别为 `(batch_size, embedding_dim)` 和 `(batch_size, num_negatives, embedding_dim)`。

    #### **步骤2:计算正样本损失**

    ```python
    positive_score = torch.sum(context_emb * target_emb, dim=1) # (batch)
    positive_loss = torch.log(torch.sigmoid(positive_score) + 1e-10)
  • 分数计算:上下文向量与目标词向量的逐元素乘积之和(点积),表示两者的相似性。
  • 损失转换:通过sigmoid将分数映射到(0,1)区间,取对数并添加极小值(1e-10)防止数值溢出。

步骤3:计算负样本损失

1
2
negative_score = torch.bmm(negative_emb, context_emb.unsqueeze(2)).squeeze()  # (batch, neg)
negative_loss = torch.sum(torch.log(torch.sigmoid(-negative_score) + 1e-10), dim=1)
  • 分数计算
    • context_emb.unsqueeze(2) 扩展维度为 (batch, embed_dim, 1)
    • torch.bmm(批量矩阵乘法)计算每个负样本与上下文向量的点积,得到形状 (batch_size, num_negatives)
  • 损失转换
    • 对负样本分数取负号(因为希望负样本得分低),通过sigmoidlog计算损失,并对所有负样本求和。

步骤4:总损失

1
loss = -(positive_loss + negative_loss).mean()
  • 合并正负样本损失,取负数(因优化器默认最小化损失),最后取批次平均值。

CBOW与Skip-gram关键区别

步骤 CBOW Skip-gram
输入处理 上下文词取平均得到单个向量 直接使用中心词向量
负样本计算维度 (batch, neg, embed_dim) @ (batch, embed_dim, 1) 同左
目标对象 从上下文预测中心词 从中心词预测上下文

数学原理

  • 正样本损失:最大化目标词与上下文(CBOW)或中心词(Skip-gram)的相似性。
    $$
    \text{positive_loss} = \log \sigma(\mathbf{v}{\text{context}} \cdot \mathbf{v}{\text{target}})
    $$
  • 负样本损失:最小化负样本与上下文的相似性。
    $$
    \text{negative_loss} = \sum_{i=1}^{k} \log \sigma(-\mathbf{v}{\text{context}} \cdot \mathbf{v}{\text{negative}_i})
    $$
  • 总损失
    $$
    \mathcal{L} = -\left(\text{positive_loss} + \text{negative_loss}\right)
    $$

代码设计要点

  1. 双嵌入层input_embeddingsoutput_embeddings分别用于输入词和输出词(即老师课上讲的中心词和上下文词向量,根据模型不同有所区别),增强模型表达能力。
  2. 负采样加速:通过批量矩阵乘法(torch.bmm)高效计算多个负样本的分数。
  3. 数值稳定性:添加1e-10避免对零取对数。
  4. 损失符号:负号将极大似然估计转换为最小化问题,与优化器兼容。

utils代码

里面没有做太多的修改

  • config.py存放配置文件,使用Config类表示,同时使用argprase模块便于处理不同的参数情况
  • dataloader.py 存放读取数据、构建词汇表、定义数据集的代码
  • metrics.py 存放余弦相似度计算与查找最相近词代码
  • negative_sampler.py 存放负采样分布生成的代码和负采样器代码

详细说明关于负采样分布部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import numpy as np

# 负采样分布
def get_negative_sampling_distribution(word_counts):
# 使用词频的0.75次方
power = 0.75
print(word_counts)
distribution = word_counts ** power # 计算负采样分布,给词频乘以0.75次方,进行平滑处理
print(distribution)
distribution /= distribution.sum() # 归一化
print(distribution)
return distribution # 返回负采样分布

class NegativeSampler:
def __init__(self, distribution):
self.distribution = distribution # 初始化分布
self.vocab_size = len(distribution) # 设置词汇表大小就是分布大小

def sample(self, batch_size, num_negatives):
# 使用numpy的choice进行批量采样
return np.random.choice(self.vocab_size, size=(batch_size, num_negatives), p=self.distribution) # 按照负采样分布采样,通过设置每个词的采样概率p实现,返回的是一个二维数组,shape=(batch_size, num_negatives),采样的范围是 [0, vocab_size),其中,二维数组的每一行表示每个正样本对应的负样本索引

1. 负采样的目的

在Word2Vec中,原始的损失函数需要计算所有词汇的softmax概率,这会导致计算复杂度与词汇表大小成正比。例如,若词汇表有10万词,每次预测都需要计算10万次点积,效率极低。负采样(Negative Sampling) 通过以下方式优化:

  • 简化计算:将多分类问题转换为二分类问题,只计算正样本和少量负样本的得分。
  • 加速训练:避免遍历整个词表,大幅减少计算量。

2. 负采样分布的数学原理

负采样的核心是如何选择负样本。直接按原始词频采样会导致高频词被过度选择(如“的”、“是”),而低频词几乎被忽略。为解决这一问题,采用修正的词频分布

  • 幂次调整(Subsampling):对词频 $f(w)$ 取 $f(w)^{0.75}$,公式为:
    $$P(w) = \frac{f(w)^{0.75}}{\sum_{w’} f(w’)^{0.75}}$$
  • 作用
    • 降低高频词的采样概率。
    • 提升低频词的采样机会,平衡数据分布。

3. 代码解析

(1) 生成负采样分布 get_negative_sampling_distribution
  • 输入word_counts 是每个词在训练语料中的出现次数。
  • 步骤
    1. 幂次调整:将词频 $f(w)$ 转换为 $f(w)^{0.75}$,抑制高频词。
    2. 归一化:将调整后的值转换为概率分布,确保所有概率之和为1。
  • 示例
    • 假设词A的原始频率为1000,词B为10。
    • 调整后:$1000^{0.75} \approx 177.8$,$10^{0.75} \approx 5.6$。
    • 归一化后,词A的概率从绝对主导(约99%)降低,词B的概率相对提升。
(2) 负采样器类 NegativeSampler
  • 功能:根据预定义的分布批量采样负样本。
  • 参数
    • batch_size:批次大小(正样本数量)。
    • num_negatives:每个正样本对应的负样本数。
  • 输出:形状为 (batch_size, num_negatives) 的数组,每行包含一个正样本对应的多个负样本索引。
  • 示例
    • batch_size=3num_negatives=2,则输出可能是:

      1
      2
      3
      [[5, 12],  # 第1个正样本的2个负样本
      [8, 3], # 第2个正样本的2个负样本
      [1, 7]] # 第3个正样本的2个负样本

4. 负采样在Word2Vec训练中的流程

整体流程

  1. 准备数据:从训练语料中提取正样本(如中心词-上下文词对)。
  2. 生成负样本:对每个正样本,采样 num_negatives 个负样本。
  3. 计算损失
    • 正样本得分:中心词与目标词的点积。
    • 负样本得分:中心词与所有负样本的点积。
    • 损失函数:最大化正样本得分,最小化负样本得分。
  4. 反向传播:根据损失更新词向量参数。

5. 负采样的作用

作用 说明
降低计算复杂度 仅计算正样本和少量负样本的得分,而非整个词表。
平衡高频/低频词 通过幂次调整,减少高频词的过度影响,增加低频词的参与。
提升语义区分能力 迫使模型区分正样本与随机负样本,增强词向量的判别性。

6. 参数选择的影响

  • 幂次值(0.75)
    • 值越小,高频词的抑制越强(极端情况为均匀分布)。
    • 原论文通过实验确定0.75为平衡点。
  • 负样本数量(num_negatives)
    • 数量越多,训练越稳定,但计算量增加。
    • 通常选择5~20个负样本。

7. 示例说明

假设语料中词频分布如下:

1
2
3
词A: 1000次
词B: 100次
词C: 10次
  • 原始分布:词A的概率为 $\dfrac{1000}{1110} \approx 90.1%$。
  • 调整后分布:词A的概率降至约 $\dfrac{1000^{0.75}}{1000^{0.75} + 100^{0.75} + 10^{0.75}} \approx 70.5%$。
  • 效果:词B和词C的采样概率显著提升,模型能更均衡地学习不同频率的词。

trainer

将原本训练的过程进行了封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import torch
import numpy as np
from torch import optim
from tqdm import tqdm

class Trainer:
def __init__(self, model, config, dataloader, word2idx, idx2word, sampler):
# 参数初始化
self.model = model
self.config = config
self.dataloader = dataloader
self.word2idx = word2idx
self.idx2word = idx2word
self.sampler = sampler

# 设备设置
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model.to(self.device)

# 优化器设置
self.optimizer = optim.Adam(self.model.parameters(), lr=config.learning_rate)

self.loss_history = [] # 记录损失

def _train_epoch(self, epoch):
'''训练单轮epoch'''
train_loss = 0 # 初始化训练损失
progress_bar = tqdm(self.dataloader, desc=f"Epoch {epoch+1}/{self.config.epochs}") # 进度条
for batch in progress_bar:
if self.config.mode == 'skip-gram': # skip-gram模式
center, target = batch # skip-gram模式,sentences为中心词和目标词对
inputs = torch.tensor(center, dtype=torch.long).to(self.device) # 转换为tensor
else:
context, target = batch # cbow模式,sentences为上下文词构成的元组和目标词对
inputs = torch.tensor(context, dtype=torch.long).to(self.device) # 转换为tensor

targets = torch.tensor(target, dtype=torch.long).to(self.device) # 将目标词转换为tensor
batch_size = targets.size(0) # 批量大小,即中心词个数

# 负采样
negative_samples = self.sampler.sample(batch_size, self.config.num_negatives) # 进行负采样,其中负采样的个数为num_negatives,结果是一个batch_size*num_negatives的矩阵
negative_samples = torch.tensor(negative_samples, dtype=torch.long).to(self.device) # 将负采样结果转换为tensor

# 清空原本的梯度,进行前向传播
self.optimizer.zero_grad()
loss = self.model(inputs, targets, negative_samples) # 计算损失

# loss进行反向传播,更新优化器参数
loss.backward()
self.optimizer.step()

# 记录损失
train_loss += loss.item()
# progress_bar.set_postfix(loss=loss.item())

# 计算平均损失
avg_loss = train_loss / len(self.dataloader)
self.loss_history.append(avg_loss) # 记录损失

return avg_loss

def train(self):
for epoch in range(self.config.epochs):
avg_loss = self._train_epoch(epoch) # 训练单轮epoch
print(f"Epoch {epoch+1}/{self.config.epochs}, Loss: {avg_loss:.4f}")

def get_and_save(self):
# 获取词向量
embeddings = self.model.input_embeddings.weight.data.cpu().numpy()

# 保存词向量
with open(self.config.save_path, 'w', encoding='utf-8') as f:
for idx, word in self.idx2word.items():
# map 是 Python 的内置函数,它会对一个可迭代对象(如列表)中的每个元素应用指定的函数(这里是 str 函数),并返回一个新的迭代器
# join 是 Python 的内置函数,它会将序列中的元素以指定的字符连接起来,并返回一个新的字符串
vector = ' '.join(map(str, embeddings[idx]))
f.write(f"{word} {vector}\n") # 保存词以及对应的词向量
print(f"Word vectors saved to {self.config.save_path}")

# 构建词向量矩阵并进行归一化
norm_embeddings = embeddings / (np.linalg.norm(embeddings, axis=1, keepdims=True) + 1e-10)

return norm_embeddings

main

保留设置随机数和相关接口调用部分,以及最后判断词的相似度部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import torch
import random
import numpy as np
from torch.utils.data import DataLoader
from torch import optim
from utils.configs import Config
from utils.dataloader import Word2VecDataset, read_data, build_vocab
from utils.negative_sampler import NegativeSampler, get_negative_sampling_distribution
from utils.metrics import cosine_similarity, find_most_similar
from models.skip_gram import SkipGramModel
from models.cbow import CBOWModel
from trainers import Trainer

# 设置随机种子以确保结果可复现
torch.manual_seed(0)
random.seed(0)
np.random.seed(0)

def main():
# 解析命令行参数
config = Config()
print("配置参数:")
print(f"embedding_dim: {config.embedding_dim}")
print(f"window_size: {config.window_size}")
print(f"num_negatives: {config.num_negatives}")
print(f"batch_size: {config.batch_size}")
print(f"epochs: {config.epochs}")
print(f"learning_rate: {config.learning_rate}")
print(f"mode: {config.mode}")
print(f"data_path: {config.data_path}")
print(f"save_path: {config.save_path}")

# 读取数据
corpus = read_data(config.data_path)
word2idx, idx2word, word_counts = build_vocab(corpus, min_count=1) # 构建词汇表
vocab_size = len(word2idx) # 词汇表大小
print(f"Vocab size: {vocab_size}") # 打印词汇表大小

# 负采样分布
distribution = get_negative_sampling_distribution(word_counts) # 根据词频构建分布
sampler = NegativeSampler(distribution) # 根据分布构建负采样器

# 创建Dataset和DataLoader
dataset = Word2VecDataset(corpus, word2idx, config.window_size, mode=config.mode) # 创建Dataset
dataloader = DataLoader(dataset, batch_size=config.batch_size, shuffle=True) # 创建DataLoader

# 初始化模型
if config.mode == 'skip-gram':
model = SkipGramModel(vocab_size, config.embedding_dim) # 初始化Skip-gram模型
else:
model = CBOWModel(vocab_size, config.embedding_dim) # 初始化CBOW模型

# 初始化训练器
trainer = Trainer(
model=model,
config=config,
dataloader=dataloader,
word2idx=word2idx,
idx2word=idx2word,
sampler=sampler
)

# 训练模型
trainer.train()

# 保存词向量,并获得归一化的词向量
norm_embeddings = trainer.get_and_save()

word_a = '新华社'
word_b = '法新社'
if word_a in word2idx and word_b in word2idx: # 判断词是否在词汇表中
vec_a = norm_embeddings[word2idx[word_a]] # 先将词转成索引,再取出对应的词向量
vec_b = norm_embeddings[word2idx[word_b]]
similarity = cosine_similarity(vec_a, vec_b) # 计算余弦相似度
print(f"'{word_a}' 与 '{word_b}' 的相似度为: {similarity:.4f}")
else:
print(f"'{word_a}' 或 '{word_b}' 不在词汇表中。")

# 示例:查找与指定词最相似的前N个词
target_word = '搜狐'
top_n = 5
similar_words = find_most_similar(target_word, word2idx, idx2word, norm_embeddings, top_n=top_n) # 找到与target_word最相似的前N个词
if similar_words:
print(f"与 '{target_word}' 最相似的前 {top_n} 个词:")
for word, score in similar_words:
print(f"{word}: {score:.4f}")

if __name__ == "__main__":
main() # 调用main函数

为什么在计算word2vec时需要反向传播

[!question] 感觉好像在word2vec计算的时候没有见到用什么卷积层、全连接层之类的结构,为什么还需要用反向传播来优化参数?

Word2Vec 训练需要优化器和反向传播的原因在于其本质是一个基于神经网络的无监督学习模型,其目标是通过调整词向量参数来最小化损失函数。以下是详细解释:
image.png

1. Word2Vec 的神经网络本质

Word2Vec 虽然结构简单,但它是一个典型的浅层神经网络模型

  • 输入层:词的索引(one-hot 向量)。
  • 隐藏层:词向量(嵌入层),没有激活函数,也就是线性单元。
  • 输出层:预测目标词的概率分布(通过 softmax

其目标是通过训练调整词向量参数,使得模型能够根据上下文预测目标词(CBOW)或根据中心词预测上下文(Skip-gram)。优化器和反向传播是神经网络训练的核心工具,用于实现这一目标。当这个模型训练好以后,我们并不会⽤这个训练好的模型处理新的任务,我们真正需要的是这个模型通过训练数据所学得的参数,例如隐层的权重矩阵。

2. 优化器作用

优化器的职责

优化器(如代码中的 Adam)负责根据损失函数的梯度更新模型参数(即词向量权重)。具体来说:

  1. 梯度计算:通过反向传播计算损失函数对模型参数的梯度。
  2. 参数更新:根据梯度方向和优化算法(如 Adam 的动量、自适应学习率)调整参数。

Word2Vec 的参数更新

在代码中,模型参数是嵌入层的权重:

1
self.model.parameters()  # 包括 input_embeddings 和 output_embeddings 的权重

优化器通过调整这些权重,使得正样本的相似性得分更高,负样本的相似性得分更低。

3. 反向传播的作用

反向传播的流程

  1. 前向传播:计算模型的输出(损失函数)。
1
loss = self.model(inputs, targets, negative_samples)
  1. 梯度计算:反向传播(loss.backward())自动计算损失对模型参数的梯度。
1
loss.backward()  # 计算梯度
  1. 参数更新:优化器根据梯度更新参数。
1
self.optimizer.step()  # 更新参数

Word2Vec 中的反向传播

  • 词向量的梯度更新
    反向传播会调整 input_embeddingsoutput_embeddings 的权重。例如:
    • 在 CBOW 中,input_embeddings 对应上下文词向量,output_embeddings 对应目标词向量。
    • 梯度会更新这些向量,使得正样本的相似性(上下文与目标词的点积)最大化,负样本的相似性最小化。

4. 训练过程的具体实现(结合代码)

步骤1:前向传播计算损失

1
loss = self.model(inputs, targets, negative_samples)
  • 输入 inputs(上下文词或中心词)、targets(目标词)、negative_samples(负样本)。
  • 模型通过嵌入层和损失计算(正样本得分 + 负样本得分)得到总损失。

步骤2:反向传播计算梯度

1
loss.backward()
  • 自动计算损失对 input_embeddingsoutput_embeddings 权重的梯度。

步骤3:优化器更新参数

1
self.optimizer.step()
  • 根据梯度更新词向量参数,例如:
    • 增大正样本的相似性(如 context_embtarget_emb 的向量方向更接近)。
    • 降低负样本的相似性(如 context_embnegative_emb 的向量方向更远离)。

5. 为什么需要显式调用优化器和反向传播?

  • 参数学习需求
    Word2Vec 的目标是通过调整词向量参数来最小化损失函数,这必须依赖梯度下降类算法。
  • 嵌入层的可训练性
    nn.Embedding 层的权重是模型参数,需要梯度更新(通过 requires_grad=True 自动实现)。
  • 负采样的高效训练
    负采样通过近似 softmax 加速训练,但仍需梯度更新(负样本的嵌入层权重需要调整)。

6. 代码中的关键操作

梯度清零

1
self.optimizer.zero_grad()  # 清空上一批次的梯度
  • 防止梯度累积(每个 batch 独立计算梯度)。

损失计算与梯度传播

1
2
3
loss = self.model(...)  # 前向传播
loss.backward() # 反向传播
self.optimizer.step() # 参数更新
  • 这是 PyTorch 训练神经网络的通用流程。

7. 直观示例

假设模型在某个 batch 中处理以下样本:

  • 正样本(context="cat", target="climb")
  • 负样本(context="cat", negative="math")

训练过程会:

  1. 计算 context_emb("cat")target_emb("climb") 的相似性得分。
  2. 计算 context_emb("cat")negative_emb("math") 的相似性得分。
  3. 通过反向传播调整:
    • 使 input_embeddings["cat"] 更接近 output_embeddings["climb"]
    • 使 input_embeddings["cat"] 远离 output_embeddings["math"]

总结

  • 优化器:负责根据梯度更新词向量参数。
  • 反向传播:自动计算损失对参数的梯度。
  • 训练目标:通过最小化损失函数,学习到语义相关的词向量表示。
    如果没有优化器和反向传播,模型将无法自动调整词向量参数,Word2Vec 的训练过程将无法实现。

Pytorch里面的回调

[!question] 为什么我这里写的是loss = self.model(inputs, targets, negative_samples),并没有调用 forward ,但是会自动执行forward 函数?

你这里调用的是:

1
loss = self.model(inputs, targets, negative_samples)

但实际上调用的是模型类(继承自nn.Module)的forward方法。这是因为:
在PyTorch中,通过调用模型实例 (self.model(...)) 时,PyTorch内部会自动调用你定义的forward方法。

也就是说:

  • 定义的模型继承自 nn.Module
  • nn.Module 类重写了Python的 __call__ 方法。
  • 当执行model(input)时,实际上调用的是model.__call__(input),这个方法会在执行一些PyTorch的内部逻辑(例如自动计算梯度、钩子函数等)之后,自动调用你自定义的forward方法。

因此,虽然你没有明确写:

1
loss = self.model.forward(inputs, targets, negative_samples)

但由于PyTorch内部机制的关系,你写:

1
loss = self.model(inputs, targets, negative_samples)

就会自动调用模型中定义的:

1
def forward(self, context_words, target_words, negative_words):

所以,代码运行的路径是:

1
2
3
self.model(inputs, targets, negative_samples)
└─> nn.Module.__call__()
└─> your_model.forward(inputs, targets, negative_samples)

推荐做法:
通常推荐直接调用模型实例(model(x))而不是显式调用forward,因为直接调用能确保所有PyTorch内置机制正常运行。

损失函数的设计原因

Word2Vec的目标和损失函数

Word2Vec的核心思想是 通过最大化上下文和目标词的相似性,最小化负样本和上下文的相似性。这里的相似性通常通过点积(即内积)来度量。

1. Skip-gram模型:

在 Skip-gram 模型中,给定一个中心词(target word),模型的目标是预测其上下文词(context words)。假设你有一个句子:

1
I love natural language processing

如果选择 “love” 为中心词,那么目标是通过 “love” 来预测上下文词(例如: “I”, “natural”, “language”, “processing”)。

损失函数的工作原理:

  • 正样本: 模型通过目标词(例如 “love”)与其上下文词(例如 “I”, “natural”, “language”, “processing”)的相似性进行训练,目标是最大化目标词与上下文词的相似性。
  • 负样本: 为了使模型学习区分非上下文词,模型从词汇表中随机选择一些词作为负样本。通过最小化这些负样本与目标词的相似性,模型能够有效地学习如何区分真实的上下文词和随机的非上下文词。

为什么最大化目标词与上下文词的相似性?

  • 最大化目标词与上下文词的相似性是因为我们希望目标词能够很好地代表其上下文,捕捉到它们之间的语义关系。例如,如果两个词经常出现在相似的上下文中(如 “dog” 和 “cat”),它们的词向量应该非常相似。

为什么最小化负样本与上下文的相似性?

  • 负样本是从词汇表中随机选出的词,这些词应该与目标词的上下文关系较弱。通过最小化负样本与上下文词的相似性,模型能够确保只有在实际语境下出现的词才会被聚集在一起,而不是将无关的词(负样本)与目标词错误地关联。

2. CBOW模型:

CBOW模型与Skip-gram相反,它的目标是给定上下文词,来预测中心词。假设句子仍然是:

1
I love natural language processing

如果选择 “love” 为中心词,CBOW模型的目标是通过上下文词(例如:”I”, “natural”, “language”, “processing”)来预测中心词 “love”。

损失函数的工作原理:

  • 正样本: 在CBOW中,给定一组上下文词,模型通过它们来预测一个目标词。目标是最大化上下文词和目标词的相似性。
  • 负样本: 同样地,通过选择负样本,模型将学习区分真实上下文和随机无关的词。

损失函数的数学形式

Word2Vec的损失函数通常使用负对数似然损失(Negative Log-Likelihood Loss),结合负采样(Negative Sampling)。假设模型预测某个上下文的目标词是 w_o,负样本为一组随机选择的词 w_1, w_2, ..., w_k,那么总的损失可以表示为:

$$L=−log⁡P(wo∣context)−∑i=1klog⁡P(wi∣context)L = - \log P(w_o | context) - \sum_{i=1}^k \log P(w_i | context)$$

  • P(w_o | context): 表示给定上下文预测目标词的概率。通过最大化这个概率,模型希望使得目标词和上下文词之间的相似性最大化。
  • P(w_i | context): 表示给定上下文预测负样本的概率。通过最小化这个概率,模型希望使得负样本和上下文词之间的相似性最小化。

在实际计算中,P(w | context) 通过 softmax 或者 负采样(Negative Sampling) 来实现,后者是一种近似计算的方式,用于提高训练效率。

总结

最大化目标词与上下文的相似性最小化负样本与上下文的相似性 的动机是基于词语在语义上的相似性原则。通过这种方式,Word2Vec可以学习到具有良好语义表示的词向量,使得语义上相似的词具有相似的向量表示。具体来说:

  • 最大化相似性:目标是使得目标词与上下文之间的语义关系被模型所捕捉,从而使得相似语义的词在向量空间中靠得更近。
  • 最小化相似性:通过负采样,使得模型学会区分实际的上下文词和随机无关的词,从而避免错误的词语关联。
    最终,这种训练方式使得模型能够有效地将每个词映射到一个低维度的向量空间,并且能够捕捉到词语之间的语义和语法相似性。

实验结果

1742353390143.png

CBOW 实现

由于 torch 版本的不同,原本能运行的代码在电脑上跑不通…让我们来解决一下
报错的原因是 batch size不统一…让我们回到数据加载的代码中看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Word2VecDataset(Dataset):
def __init__(self, corpus, word2idx, window_size, mode='skip-gram'):
self.pairs = [] # 保存中心词和上下文词对
self.mode = mode # 设置 skip-gram 或 cbow 模式
for sentence in corpus:
# 过滤掉不在词汇表中的词,并将句子中的词索引化,得到一个一维列表
indices = [word2idx[word] for word in sentence if word in word2idx]
for center_pos, center_word in enumerate(indices):
# 定义上下文窗口
start = max(0, center_pos - window_size) # 窗口起始位置,注意这里有边界问题,如果窗口越界,则取0
end = min(len(indices), center_pos + window_size + 1) # 窗口结束位置,注意这里也有边界问题,如果窗口越界,则取序列长度
context = indices[start:center_pos] + indices[center_pos+1:end] # 上下文词列表
for context_word in context:
if mode == 'skip-gram':
self.pairs.append((center_word, context_word)) # 保存中心词和上下文词对,将中心词与每一个上下文词配对,共配对len(context)次,表示中心词预测上下文词
elif mode == 'cbow':
self.pairs.append((list(context), center_word)) # 保存上下文词列表和中心词对,注意这里用了元组,表示多个上下文词共同预测中心词
print(f"Total pairs for {mode}: {len(self.pairs)}")

def __len__(self):
return len(self.pairs) # 返回样本数

def __getitem__(self, idx):
return self.pairs[idx] # 返回第idx个配对

其中,对于skip-gram和cbow,采用了不同的配对方式:

1
2
3
4
5
6
7
8
9
# skip-gram:
[center_word, context_word1]
...
[center_word, context_wordn]

# cbow
[[context], center_word]
...
[[context], center_word]

但是[context]在边界时,可能会出现长度不一致现象,于是batch size不统一,在高版本的torch中,这个问题被隐式解决了,但是在低版本中没有得到正确处理。

通常,遇到这种情况,最常见的解决办法就是重写一个 collate_fn 函数,在dataloader构建的时候传入,这样就可以按照我们写的 collate_fn 进行处理。

什么是 collate_fn

collate_fn 是 PyTorch 的 DataLoader 中一个非常重要的参数,它用于定义如何将多个样本(从数据集中获取的)组合成一个批次(batch)。默认情况下,DataLoader 会自动将样本堆叠成一个批次,但在某些情况下,默认行为可能不适用,这时就需要重写 collate_fn

1. collate_fn 的作用

在 PyTorch 中,DataLoader 的作用是从数据集中加载数据,并将其整理成批次(batch),供模型训练或推理使用。collate_fn 是一个函数,它定义了如何将多个样本(从数据集中获取的)组合成一个批次。
默认的 collate_fn 行为是:

  • 如果样本是张量(tensor),它会将样本堆叠(stack)成一个更大的张量。
  • 如果样本是列表、元组或字典,它会递归地对每个元素进行堆叠。
    例如:
1
2
3
4
5
6
# 假设数据集返回的样本是 (tensor, label)
sample1 = (torch.tensor([1, 2, 3]), 0)
sample2 = (torch.tensor([4, 5, 6]), 1)

# 默认的 collate_fn 会将它们组合成:
batch = (torch.tensor([[1, 2, 3], [4, 5, 6]]), torch.tensor([0, 1]))

2. 为什么需要重写 collate_fn

默认的 collate_fn 假设所有样本的形状和类型是一致的,但在某些情况下,这种假设不成立,这时就需要自定义 collate_fn。以下是一些常见的场景:

(1) 样本长度不一致

例如,在处理变长序列(如文本、音频)时,每个样本的长度可能不同。默认的 collate_fn 无法直接处理这种情况。

  • 解决方法:在 collate_fn 中使用 pad_sequence 对序列进行填充(padding),使它们的长度一致。
1
2
3
4
5
6
7
8
9
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
# batch 是一个列表,每个元素是 (sequence, label)
sequences, labels = zip(*batch)
# 对序列进行填充
sequences_padded = pad_sequence(sequences, batch_first=True, padding_value=0)
labels = torch.tensor(labels)
return sequences_padded, labels

(2) 样本是复杂的数据结构

如果样本是字典、嵌套的元组或其他复杂结构,默认的 collate_fn 可能无法正确处理。

  • 解决方法:在 collate_fn 中手动定义如何组合这些复杂结构。
1
2
3
4
5
def collate_fn(batch):
# batch 是一个列表,每个元素是 {"data": tensor, "label": int}
data = torch.stack([item["data"] for item in batch])
labels = torch.tensor([item["label"] for item in batch])
return {"data": data, "label": labels}

(3) 需要自定义数据处理逻辑

有时,我们可能需要在组合批次时对数据进行额外的处理,例如数据增强、归一化等。

  • 解决方法:在 collate_fn 中添加自定义逻辑。
1
2
3
4
5
6
def collate_fn(batch):
images, labels = zip(*batch)
# 对图像进行归一化
images = torch.stack([normalize(img) for img in images])
labels = torch.tensor(labels)
return images, labels

3. 如何使用 collate_fn

在创建 DataLoader 时,将自定义的 collate_fn 传递给 collate_fn 参数即可:

1
2
3
4
from torch.utils.data import DataLoader

# 假设 dataset 是你的数据集
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

4. 总结

  • 默认行为collate_fn 默认会将样本堆叠成一个批次,适用于样本形状和类型一致的情况。
  • 重写场景:当样本长度不一致、数据结构复杂或需要自定义处理逻辑时,需要重写 collate_fn
  • 灵活性:通过自定义 collate_fn,可以灵活地处理各种数据形式,满足不同的需求。

解决办法1:直接填0

按照我们上面的介绍,为了处理这种情况,最简单的解决办法就是对不等长的样本进行填充,通常直接填0:

1
2
3
4
5
def collate_fn_cbow(batch):
contexts, targets = zip(*batch) # 把batch解包
max_len = max(len(ctx) for ctx in contexts)
padded_contexts = [ctx + [0]*(max_len - len(ctx)) for ctx in contexts] # 假设0是PAD的索引
return torch.LongTensor(padded_contexts), torch.LongTensor(targets)

这样子当然可以跑,不过有两个问题:

  • 首先0本身也是一个单词的id,让我们回到构建dataset的部分:
1
2
3
4
5
6
7
8
9
10
def build_vocab(corpus, min_count=1):
counter = Counter() # 词频统计
for sentence in corpus:
counter.update(sentence) # 统计每个词的词频,update方法可以累加计数
# 过滤低频词
vocab = {word for word, count in counter.items() if count >= min_count} # 保留词频大于等于min_count的词
word2idx = {word: idx for idx, word in enumerate(vocab)} # 词到索引的映射
idx2word = {idx: word for word, idx in word2idx.items()} # 索引到词的映射
word_counts = np.array([counter[idx2word[i]] for i in range(len(idx2word))], dtype=np.float32) # 词频数组
return word2idx, idx2word, word_counts

enumerate 默认是从0开始的,因此这里word2idx和idx2word实际上0号索引本身是有一个单词对应的。

其次,如果特别设置了0是一个特殊的token,那你用0作为上下文就会影响中心词的表达,因为真实的场景不会有特殊id,也就是说,填充的0会被视为有效上下文词,影响中心词的语义表达。

解决方法2:添加<PAD>标记,设置专门掩码

这个方法需要对build_vocab进行修改,然后再修改cbowforward函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def build_vocab(corpus, min_count=1):
counter = Counter()
for sentence in corpus:
counter.update(sentence)

# 显式添加PAD标记
special_tokens = ['<PAD>'] # 索引0
sorted_words = special_tokens + sorted(
[word for word, count in counter.items() if count >= min_count],
key=lambda x: (-counter[x], x)) # 其他词按词频排序

word2idx = {word: idx for idx, word in enumerate(sorted_words)}
idx2word = {idx: word for idx, word in enumerate(sorted_words)}
word_counts = np.array([counter.get(word, 0) for word in sorted_words], dtype=np.float32) # 从 counter 中获取 word 的词频。如果 word 不在 counter 中(例如 <PAD>),则返回默认值 0
return word2idx, idx2word, word_counts

def collate_fn_cbow(batch):
# 解包批次数据:contexts是多个上下文词列表,targets是对应的中心词
contexts, targets = zip(*batch)
# 计算当前批次中最长的上下文长度
max_len = max(len(ctx) for ctx in contexts)
# 获取PAD标记的索引(假设是0)
pad_idx = 0
# 初始化填充后的上下文列表
padded_contexts = []

# 对每个上下文进行填充
for ctx in contexts:
# 计算需要填充的长度
padding_needed = max_len - len(ctx)
# 填充PAD标记(0)
padded_ctx = ctx + [pad_idx] * padding_needed
padded_contexts.append(padded_ctx)

# 转换为Tensor(形状:[batch_size, max_len])
padded_contexts = torch.LongTensor(padded_contexts)
targets = torch.LongTensor(targets)

return padded_contexts, targets

class CBOWModel(nn.Module):
'''
前面的内容不变
'''
def forward(self, context_words, target_words, negative_words):
# context_words形状: [batch_size, context_len]
batch_size, context_len = context_words.shape

# 步骤1:创建掩码(Mask),标识哪些位置是真实的上下文词(非PAD)
# 假设PAD的索引是0,非PAD位置的掩码为True,PAD位置为False
mask = (context_words != 0) # [batch_size, context_len]

# 步骤2:获取上下文词向量
context_emb = self.input_embeddings(context_words) # [batch, context_len, embed_dim]

# 步骤3:应用掩码,将PAD位置的向量置零
# 扩展掩码的维度以匹配词向量的形状:[batch, context_len, 1]
mask_expanded = mask.unsqueeze(-1).float()
context_emb_masked = context_emb * mask_expanded # [batch, context_len, embed_dim]

# 步骤4:计算有效上下文的均值(忽略PAD)
# 求和:沿着context_len维度求和
context_emb_sum = torch.sum(context_emb_masked, dim=1) # [batch, embed_dim]

# 计算有效上下文词的数量(每个样本的context_len减去PAD的数量)
valid_word_counts = torch.sum(mask.float(), dim=1) # [batch]

# 避免除以零(如果某个样本的上下文全是PAD)
valid_word_counts = valid_word_counts.clamp(min=1e-10)

# 计算均值
context_emb_mean = context_emb_sum / valid_word_counts.unsqueeze(-1) # [batch, embed_dim]

# 步骤5:计算目标词和负样本的词向量
# 目标词和负样本通过输出嵌入层转换为向量,形状分别为 (batch_size, embedding_dim) 和 (batch_size, num_negatives, embedding_dim)。
target_emb = self.output_embeddings(target_words) # (batch, embed_dim)
negative_emb = self.output_embeddings(negative_words) # (batch, neg, embed_dim)

# 计算正样本的损失
# 分数计算:上下文向量与目标词向量的逐元素乘积之和(点积),表示两者的相似性。
positive_score = torch.sum(context_emb_mean * target_emb, dim=1) # (batch)
# 损失转换:通过sigmoid将分数映射到(0,1)区间,取对数并添加极小值(1e-10)防止数值溢出。
positive_loss = torch.log(torch.sigmoid(positive_score) + 1e-10)

# 计算负样本的损失
# 分数计算:context_emb.unsqueeze(2) 扩展维度为 (batch, embed_dim, 1),torch.bmm(批量矩阵乘法)计算每个负样本与上下文向量的点积,得到形状 (batch_size, num_negatives)
negative_score = torch.bmm(negative_emb, context_emb_mean.unsqueeze(2)).squeeze() # (batch, neg)
# 损失转换:对负样本分数取负号(因为希望负样本得分低),通过sigmoid和log计算损失,并对所有负样本求和。
negative_loss = torch.sum(torch.log(torch.sigmoid(-negative_score) + 1e-10), dim=1) # (batch)

loss = -(positive_loss + negative_loss) # (batch),负号是为了最小化loss
return loss.mean() # 平均loss

这部分的代码思路有点长,用deepseek来解释一下:

一、改进后的 collate_fn_cbow 函数

功能目标

CBOW 模式 下,每个样本的上下文词列表(context)长度可能不同(例如一个样本的上下文有2个词,另一个有4个词)。collate_fn_cbow 的作用是将这些不同长度的上下文填充到相同长度,以便批量输入模型。

示例说明

假设一个批次包含两个样本:

  • 样本1:context = [2, 3], target = 5
  • 样本2:context = [4, 1, 7], target = 6

经过 collate_fn_cbow 处理后:

  • max_len = 3(最长上下文长度)

  • 填充后的结果:

    1
    2
    3
    4
    5
    padded_contexts = [
    [2, 3, 0], # 样本1填充一个0
    [4, 1, 7] # 样本2无需填充
    ]
    targets = [5, 6]

关键点

  • 填充值:使用显式定义的 <PAD> 标记(索引为0),确保不与真实词汇冲突。
  • 输入形状:填充后的上下文张量形状为 [batch_size, max_len],可直接输入模型。

二、改进后的模型 forward 函数(以 CBOW 为例)

功能目标

在计算上下文向量的均值时,忽略填充的 <PAD> 标记,确保填充位置不影响模型学习。

示例说明

假设输入为:

1
2
3
4
context_words = [
[2, 3, 0], # 样本1(第三个位置是PAD)
[4, 1, 7] # 样本2(无PAD)
]
  • 掩码生成

    1
    2
    3
    4
    mask = [
    [True, True, False], # 样本1的第三个位置是PAD
    [True, True, True] # 样本2无PAD
    ]
  • 词向量掩码处理

    • 样本1的第三个位置的词向量会被置零,不参与求和。
  • 均值计算

    • 样本1的均值 = (向量2 + 向量3) / 2
    • 样本2的均值 = (向量4 + 向量1 + 向量7) / 3

关键点

  • 掩码机制:通过布尔掩码标识有效词位置,确保PAD位置的向量不参与计算。
  • 鲁棒性:使用 clamp(min=1e-10) 避免除零错误(即使某个样本的上下文全是PAD,也能安全计算)。

总结

  1. **collate_fn_cbow**:
    • 作用:将变长的上下文填充到相同长度,生成批量数据。
    • 关键:使用独立的 <PAD> 标记(索引0)填充,不与真实词汇冲突。
  2. 模型 forward 函数
    • 作用:通过掩码机制,在计算上下文向量时忽略填充位置。
    • 关键:利用掩码屏蔽无效位置,确保模型只关注真实的上下文词。
      通过这两项改进,代码可以正确处理 CBOW 模式的变长输入,同时避免 PAD 标记对模型训练的干扰。

最后再运行一次:
image.png