新零售行业评价决策系统

一、项目介绍【理解】

1、项目背景

  • 随着科技的迅速发展和智能设备的普及,AI技术在新零售行业中得到了广泛应用。其中 智能推荐系统 是AI技在新零售中最为常见且有效的应用之一。通过分析用户的购买历史、浏览行为以及喜好偏好,推荐系统可以根据个人特征给用户进行个性化商品推荐。这种个性化推荐不仅可以提高用户购买意愿,减少信息过载,还可以带来更高的用户满意度和销量。
  • 在智能推荐系统中,文本分类的应用属于重要的应用环节。比如:某电商网站都允许用户为商品填写评论,这些文本评论能够体现出用户的偏好以及商品特征信息,是一种语义信息丰富的隐式特征。 相比于单纯的利用显式评分特征,文本信息一方面可以弥补评分稀疏性的问题,另一方面在推荐系统的可解释方面也能够做的更好。
  • 因此,本次项目我们将 以”电商平台用户评论”为背景,基于深度学习方法实现评论文本的准确分类 ,这样做的目的是通过用户对不同商品或服务的评价,平台能够快速回应用户需求,改进产品和服务。同时,自动分类也为个性化推荐奠定基础,帮助用户更轻松地找到符合其偏好的商品。

2、评论文本分类实现方法

2.1 传统的深度学习方法

  • 目前实现文本分类的方法很多,如经典的应用于文本的卷积神经网络(Text-CNN)、循环神经网络(Text-RNN)、基于BERT等预训练模型的fine-tuning等,但是这些方法多为建立在具有大量的标注数据下的有监督学习。在很多实际场景中,由于领域特殊性和标注成本高,导致标注训练数据缺乏,模型无法有效地学习参数,从而易出现过拟合现象。因此,如何 通过小样本数据训练得到一个性能较好的分类模型 是目前的研究热点。

2.2 模型微调方法

  • 基于前面章节的介绍,我们可以借助Prompt-Tuning的技术,来实现模型部分参数的微调(当然如果模型参数较小比如BERT,也可以全量参数微调),相比传统技术方法,Prompt-Tuning方法可以实现在较少样本的训练上,就可以达到较好的结果。
  • 在本次项目中,我们将分别基于 BERT+PET(硬模版)以及BERT+P-Tuning(软模版) 两种方式实现用户评论文本的分类。重点是理解prompt的构造方法,以及promt-tuning方法的实现原理。

二、BERT+PET方式介绍【理解】

1、==PET回顾==

  • PET(PatternExploiting Training)的核心思想是:==根据先验知识人工定义模版,将目标分类任务转换为与MLM一致的完形填空,然后再去微调MLM任务参数。==

图中示例1: 情感分类任务(好评还是差评),原始文本:这家店真不错,值得推荐。PET模板: [MASK]满意。Label:不/很。标签词映射(Label Word Verbalizer):例如如果[MASK]预测的词是“不”,则认为是差评类,如果是“很”,则认为是好评类。

图中示例2:新闻分类任务(多分类),原始文本:中国女排再夺冠!PET模版:下面是[MASK] [MASK]新闻,Label:体育/财经/时政/军事


  • PET 方法的核心步骤

PET方法整体过程可以概括为:首先,将下游任务通过人工模板(pattern)转化为语言模型的填空任务,并通过verbalizer把预测的词映射到任务标签,从而用少量标注样本训练多个“子模型”;接着,这些子模型在大量未标注数据上生成伪标签,形成软标注数据;最后,通过知识蒸馏,训练一个单一的学生模型来学习多个子模型的预测分布,从而兼顾鲁棒性和泛化能力。在这里,我们只需要完成分类任务,所以只需要实现第一步即可。

具体步骤如下:

1)定义任务模式 (Task Patterns):

  • 首先,你需要将下游任务的输入和输出,转换为一种 包含空白([MASK] 或其他特殊标记)的自然语言句子模板 。这些模板被称为“模式”。
  • 示例 (情感分类):
    • 原始输入:这部电影太棒了! 标签:积极
    • POFT 模式:这部电影太棒了!这是一部____的电影。 (其中 ____ 是待填充的空白)
  • 示例 (问题回答 - 抽取式):
    • 原始输入:上下文:北京是中国的首都。问题:中国的首都是哪里? 答案:北京
    • POFT 模式:根据上下文:北京是中国的首都。中国的首都是哪里?答案是____。

2) 定义标签映射 (Verbalizer):

  • 对于任务的每个标签(或答案),你需要将其映射到 PLM(预训练语言模型) 词汇表中的一个或多个 具体词汇
  • 示例 (情感分类):
    • 积极, , 优秀
    • 消极, , 糟糕
  • 示例 (问题回答): 答案本身就是模型需要生成的词语。

3) 构造训练样本:

  • 将你的 所有有标签的训练数据 ,根据定义的模式和标签映射进行转换。
  • 对于每个样本,输入变成模式化的句子,而模型的训练目标是在空白处生成正确的 Verbalizer 词汇(或答案词汇)。

4) 全量微调 PLM:

  • 在这些 模式化转换后的训练数据 上,对 整个预训练语言模型进行全量微调
  • 微调的目标函数通常是 交叉熵损失 ,旨在最大化模型在空白处预测正确 Verbalizer 词汇的概率。这实际上是回归到 PLM 预训练时的 语言模型目标 (如掩码语言模型或文本生成)。
    • 区别于 Prompt Engineering: PFT 在这里 更新模型的所有参数 ,而不仅仅是 Prompt 向量。
    • 区别于传统 Fine-tuning: 传统 Fine-tuning 可能是在 PLM 上添加一个专门的分类头或抽取层进行微调。而 POFT 则是让 PLM 通过 预测词汇 来完成任务,更接近其预训练的方式。

5) 推理阶段:

  • 对于新的输入,同样通过模式进行转换。
  • 将转换后的输入送入微调后的 PLM。
  • 模型会在空白处生成最可能的词汇。通过 Verbalizer,将这些预测的词汇反向映射回任务的原始标签或答案。
    • 例如,如果模型预测 的概率最高,就将其映射为 积极

2、 环境准备

本项目基于 torch+ transformers 实现,运行前请安装相关依赖包:

  • python==3.10
  • transformers==4.40.2
  • torch==2.5.1+cu121
  • datasets==3.6.0
  • scikit-learn==1.7.0

3、项目架构

项目架构流程图:

项目整体代码介绍:

image-20250818044920669

三、BERT+PET方式数据预处理【理解】

  • 本项目中对数据部分的预处理步骤如下:
    1. 查看项目数据集
    2. 编写Config类项目文件配置代码
    3. 编写数据处理相关代码

1、查看项目数据集

  • 数据存放位置:llm_tuning/prompt_tasks/PET/data

  • data文件夹里面包含4个txt文档,分别为:train.txt、dev.txt、prompt.txt、verbalizer.txt


1.1 train.txt

  • train.txt为训练数据集,其部分数据展示如下:
1
2
3
4
5
6
水果	脆脆的,甜味可以,可能时间有点长了,水分不是很足。
平板 华为机器肯定不错,但第一次碰上京东最糟糕的服务,以后不想到京东购物了。
书籍 为什么不认真的检查一下, 发这么一本脏脏的书给顾客呢!
衣服 手感不错,用料也很好,不知道水洗后怎样,相信大品牌,质量过关,五星好评!!!
水果 苹果有点小,不过好吃,还有几个烂的。估计是故意的放的。差评。
衣服 掉色掉的厉害,洗一次就花了

train.txt一共包含63条样本数据,每一行用\t分开,前半部分为标签(label),后半部分为原始输入 (用户评论)。

如果想使用自定义数据训练,只需要仿照上述示例数据构建数据集即可。


1.2 dev.txt

  • dev.txt为验证数据集,其部分数据展示如下:
1
2
3
4
5
6
书籍	"一点都不好笑,很失望,内容也不是很实用"
衣服 完全是一条旧裤子。
手机 相机质量不错,如果阳光充足,可以和数码相机媲美.界面比较人性化,容易使用.软件安装简便
书籍 明明说有货,结果送货又没有了。并且也不告诉我,怎么评啊
洗浴 非常不满意,晚上洗的头发,第二天头痒痒的不行了,还都是头皮屑。
水果 这个苹果感觉是长熟的苹果,没有打蜡,不错,又甜又脆

dev.txt一共包含590条样本数据,每一行用\t分开,前半部分为标签(label),后半部分为原始输入 (用户评论)。

如果想使用自定义数据训练,只需要仿照上述示例数据构建数据集即可。

1.3 prompt.txt

  • prompt.txt为人工设定提示模版,其数据展示如下:
1
这是一条{MASK}评论:{textA}

其中,用大括号括起来的部分为「自定义参数」,可以自定义设置大括号内的值。

示例中 {MASK} 代表 [MASK] token 的位置,{textA} 代表评论数据的位置。

你可以改为自己想要的模板,例如想新增一个 {textB} 参数:

1
{textA}和{textB}是{MASK}同的意思。

1.4 verbalizer.txt

  • verbalizer.txt 主要用于定义「真实标签」到「标签预测词」之间的映射。在有些情况下,将「真实标签」作为 [MASK] 去预测可能不具备很好的语义通顺性,因此,我们会对「真实标签」做一定的映射。

  • 例如:

1
"中国爆冷2-1战胜韩国"是一则[MASK][MASK]新闻。	体育
  • 这句话中的标签为「体育」,但如果我们将标签设置为「足球」会更容易预测。

  • 因此,我们可以对「体育」这个 label 构建许多个子标签,在推理时,只要预测到子标签最终推理出真实标签即可,如下:

1
体育 -> 足球,篮球,网球,棒球,乒乓,体育
  • 项目中标签词映射数据展示如下:
1
2
3
4
5
6
7
8
9
10
电脑	电脑
水果 水果
平板 平板
衣服 衣服
酒店 酒店
洗浴 洗浴
书籍 书籍
蒙牛 蒙牛
手机 手机
电器 电器

verbalizer.txt 一共包含10个类别,上述数据中,我们使用了1对1的verbalizer, 如果想定义一对多的映射,只需要在后面用”,”分割即可, eg:

1
水果	苹果,香蕉,橘子

若想使用自定义数据训练,只需要仿照示例数据构建数据集

2、编写Config类项目文件配置代码

  • 代码路径:llm_tuning/prompt_tasks/PET/pet_config.py

  • config文件目的:配置项目常用变量,一般这些变量属于不经常改变的,比如:训练文件路径、模型训练次数、模型超参数等等

具体代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import torch
import os

base_dir = os.path.dirname(os.path.abspath(__file__))
# print(f'base_dir-->{base_dir}')

class ProjectConfig(object):
def __init__(self):
# 初始化设备配置,根据系统环境选择使用GPU或CPU
self.device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
# self.device = "mps:0"

# 预训练模型路径配置
self.pre_model = os.path.join(base_dir, '../../bert-base-chinese')

# 训练、验证数据集路径配置
self.train_path = os.path.join(base_dir, 'data/train.txt')
self.dev_path = os.path.join(base_dir, 'data/dev.txt')

# 提示词和标签映射文件路径配置
self.prompt_file = os.path.join(base_dir, 'data/prompt.txt')
self.verbalizer = os.path.join(base_dir, 'data/verbalizer.txt')

# 模型输入序列最大长度配置
self.max_seq_len = 256

# 设置训练的超参数
self.batch_size = 8 # 每个批次的大小,根据显存和模型大小调整
self.learning_rate = 5e-5 # 学习率,影响模型收敛速度和效果
self.weight_decay = 0 # 权重衰减,用于防止过拟合,这里不使用权重衰减
self.warmup_ratio = 0.06 # 学习率预热比例,帮助模型初期更快地学习
self.max_label_len = 2 # 最大标签长度,限制输出序列的最大长度
self.epochs = 20 # 训练的轮数,即整个数据集通过模型的次数

# 日志和验证配置
self.logging_steps = 2
self.valid_steps = 20

# 模型保存路径配置
self.save_dir = os.path.join(base_dir, 'save_model')


if __name__ == '__main__':
pc = ProjectConfig()
print(pc.prompt_file)
print(pc.pre_model)

3、编写数据处理相关代码

  • 代码路径:llm_tuning/prompt_tasks/PET/data_handle/

  • data_handle文件夹中一共包含三个py脚本:template.py、data_preprocess.py、data_loader.py

3.1 ==template.py==

  • 目的:构建固定模版类,text2id的转换
  • 思路:

image-20250823121459553

  • 定义HardTemplate类代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from transformers import AutoTokenizer
import numpy as np

from prompt_tasks.PET.pet_config import ProjectConfig


# 使用硬模板,人工定义句子和[MASK]之间的位置关系。
class HardTemplate(object):
def __init__(self, prompt: str):
'''
初始化Prompt对象的构造函数
:param prompt: prompt格式定义字符串, 表示待处理的提示模板 e.g. -> "这是一条{MASK}评论:{textA}"
'''
self.prompt = prompt # 保存原始的提示模板字符串
self.inputs_list = [] # 根据文字prompt拆分为各part的列表
self.custom_tokens = {'MASK'} # 初始化自定义token集合,至少包含'MASK' token

self.prompt_analysis() # 解析prompt模板,初始化时即对prompt进行分析处理

def prompt_analysis(self):
'''
将prompt文字模板拆解为可映射的数据结构。
Examples:
prompt -> "这是一条{MASK}评论:{textA}"
inputs_list -> ['这', '是', '一', '条', 'MASK', '评', '论', ':', 'textA']
custom_tokens -> {'textA', 'MASK'}
:return:
'''
# print(f'prompt-->{self.prompt}')
idx = 0
# 遍历提示模板字符串中的每个字符
while idx < len(self.prompt):
str_part = ''
# 如果当前字符不是'{', '}',则直接添加到输入列表中
if self.prompt[idx] not in ['{', '}']:
self.inputs_list.append(self.prompt[idx])
# 如果遇到'{',表示进入自定义字段部分
if self.prompt[idx] == '{': # 进入自定义字段
idx += 1
# 继续遍历直到遇到'}',并将自定义字段的值拼接到str_part中
while self.prompt[idx] != '}':
str_part += self.prompt[idx] # 拼接该自定义字段的值
idx += 1
# print(f'idx-->{idx}')
# 如果遇到'}',但没有对应的'{',抛出异常提示括号不匹配
elif self.prompt[idx] == '}':
raise ValueError("遇到了单独的 '}', 请检查输入的prompt。")
# 如果str_part不为空,表示已经完整地获取了一个自定义字段
if str_part:
self.inputs_list.append(str_part) # 将所有自定义字段添加到输入列表中
self.custom_tokens.add(str_part) # 将所有自定义字段存储,后续会检测输入信息是否完整
# 移动到下一个字符
idx += 1

# print(f'self.inputs_list-->{self.inputs_list}')
# print(f'self.custom_tokens-->{self.custom_tokens}')

def __call__(self,
inputs_dict: dict,
tokenizer,
mask_length,
max_seq_len=512):
'''
输入一个样本,转换为符合模板的格式。
:param inputs_dict: prompt中的参数字典, e.g. -> { 'textA': '包装不错,苹果挺甜的,个头也大。', 'MASK': '[MASK]'}
:param tokenizer: 用于encoding文本的分词器
:param mask_length: MASK token 的长度
:param max_seq_len: 最大的句子长度
:return:
dict -> {
'text': '[CLS]这是一条[MASK][MASK]评论:包装不错,苹果挺甜的,个头也大。[SEP][PAD][PAD][PAD]',
'input_ids': [101, 6821, 3221, 671, 3340, 103, 103, 6397, ..., 102, 0, 0, 0],
'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, ..., 0, 0, 0, 0],
'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, ..., 1, 1, 1, 1,],
'mask_position': [5, 6]
}
'''
# 定义输出格式
# 初始化一个字典对象以存储处理后的输出数据
# 该字典包含了文本数据及其对应的编码信息、注意力掩码和掩码位置等关键信息
outputs = {
# 存储原始文本数据
'text': '',
# 存储文本经过分词和数值化后的输入ID序列
'input_ids': [],
# 存储段嵌入(token type embeddings)的ID序列,用于区分不同句子
'token_type_ids': [],
# 存储注意力掩码,用于指示每个token是否应该被关注
'attention_mask': [],
# 存储掩码位置,即在输入序列中被掩码的token的位置
'mask_position': []
}

# print(f'inputs_dict-->{inputs_dict}')
# 初始化一个空字符串,用于构建最终的格式化字符串
str_formated = ''
# 遍历输入列表中的每个值
for value in self.inputs_list:
# 检查当前值是否在custom_tokens中
if value in self.custom_tokens:
# 如果当前值是'MASK',使用mask_length副本的inputs_dict中的对应值
if value == 'MASK':
str_formated += inputs_dict[value] * mask_length
else:
# 对于其他自定义值,直接添加inputs_dict中的对应值
str_formated += inputs_dict[value]
else:
# 如果当前值不是custom_tokens中的值,直接添加到格式化字符串中
str_formated += value
# 打印格式化后的字符串,用于调试和验证
# print(f'str_formated-->{str_formated}')

# 使用tokenizer对格式化后的字符串进行编码
# 编码配置包括截断、最大长度设置和填充,以满足模型输入的要求
encoded = tokenizer(text=str_formated,
truncation=True,
max_length=max_seq_len,
padding='max_length')
# print('*' * 80)
# print(f'encoded--->{encoded}')
# 将编码后的输入ID赋值给输出字典中的'input_ids'键
outputs['input_ids'] = encoded['input_ids']
# 将编码后的token类型ID赋值给输出字典中的'token_type_ids'键
outputs['token_type_ids'] = encoded['token_type_ids']
# 将编码后的注意力掩码赋值给输出字典中的'attention_mask'键
outputs['attention_mask'] = encoded['attention_mask']

# print(tokenizer.convert_ids_to_tokens(encoded['input_ids']))
# 将编码后的输入ID转换为文本,并存储到输出字典中
outputs['text'] = ''.join(tokenizer.convert_ids_to_tokens(encoded['input_ids']))
# print(f'outputs-->{outputs}')

# 将掩码标记 '[MASK]' 转换为其对应的ID
mask_token_id = tokenizer.convert_tokens_to_ids('[MASK]')
# print(f'mask_token_id-->{mask_token_id}')
# print(np.array(outputs['input_ids']) == mask_token_id)
# print(np.where(np.array(outputs['input_ids']) == mask_token_id))
# 计算并获取输入ID中'mask'标记的位置,并将其转换为列表
mask_position = np.where(np.array(outputs['input_ids']) == mask_token_id)[0].tolist()
# print(f'mask_position-->{mask_position}')
# 将计算出的mask_position添加到outputs字典中
outputs['mask_position'] = mask_position
return outputs


if __name__ == '__main__':
# 创建ProjectConfig对象以获取项目配置
pc = ProjectConfig()

# 根据预训练模型配置,加载分词器
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)

# 定义一个硬模板对象,用于构建特定格式的输入文本
hard_template = HardTemplate(prompt='这是一条{MASK}评论:{textA}')
# 打印硬模板的输入列表和自定义token信息,以便调试
print(f'inputs_list-->{hard_template.inputs_list}')
print(f'custom_tokens-->{hard_template.custom_tokens}')

# 使用硬模板、分词器和指定的输入字典构建一个模板实例
# 调用模板对象, 自动调用__call__方法
tep = hard_template(
inputs_dict={'textA': '包装不错,苹果挺甜的,个头也大。', 'MASK': '[MASK]'},
tokenizer=tokenizer,
mask_length=2,
max_seq_len=30
)
print(f'tep--->{tep}')


3.2 ==data_preprocess.py==

  • 目的: 将样本数据转换为模型接受的输入数据。具体来说,就是将每行数据进行处理,获取数据的标签和评论信息,然后进行处理获取输入和标签。
  • 定义数据转换方法convert_example(),代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import numpy as np
import torch
from datasets import load_dataset
# partial:把一个函数的某些参数给固定住(也就是设置默认值),返回一个新的函数,调用这个新函数会更简单
from functools import partial
from transformers import AutoTokenizer

from prompt_tasks.PET.data_handle.template import HardTemplate
from prompt_tasks.PET.pet_config import ProjectConfig


def convert_example(
examples: dict,
tokenizer,
max_seq_len: int,
max_label_len: int,
hard_template: HardTemplate,
train_mode=True,
return_tensor=False) -> dict:
'''
将样本数据转换为模型接收的输入数据。
:param examples: dict类型。训练数据样本,
e.g. -> {
"text": ['手机 这个手机也太卡了。',
'体育 世界杯为何迟迟不见宣传',
...
]}
:param tokenizer: 分词器对象
:param max_seq_len: int类型。句子的最大长度,若没有达到最大长度,则padding为最大长度
:param max_label_len: int类型。最大label长度,若没有达到最大长度,则padding为最大长度
:param hard_template: HardTemplate类型。模板类
:param train_mode: bool类型。训练阶段 or 推理阶段
:param return_tensor: bool类型。是否返回tensor类型,如不是,则返回numpy类型。
:return:
dict (str: np.array) -> tokenized_output = {
'input_ids': [[1, 47, 10, 7, 304, 3, 3, 3, 3, 47, 27, 247, 98, 105, 512, 777, 15, 12043, 2], ...],
'token_type_ids': [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ...],
'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], ...],
'mask_positions': [[5, 6, 7, 8], ...],
'mask_labels': [[2372, 3442, 0, 0], [2643, 4434, 2334, 0], ...]
}
'''
# 初始化一个字典,用于存储token化的输出信息
tokenized_output = {
'input_ids': [], # 输入文本的token ID序列
'token_type_ids': [], # token类型ID序列,用于区分不同句子的token
'attention_mask': [], # 注意力掩码序列,用于标识真实token与padding token
'mask_positions': [], # mask标签在输入序列中的位置
'mask_labels': [] # 需要预测的mask标签的真实值
}
# print(f'examples--》{examples}')
# 遍历examples中的'text'列表,获取索引和文本内容
for i, example in enumerate(examples['text']):
# 判断是否处于训练模式
if train_mode:
# print(f'example-->{example}')
# 将文本内容按制表符分割,获取标签和内容
label, content = example.strip().split('\t')
# print(f'label-->{label}')
# print(f'content-->{content}')

# 使用tokenizer对标签进行编
label_encoded = tokenizer(label, add_special_tokens=False)['input_ids']
# print(f'label_encoded-->{label_encoded}')

# 如果标签长度超过最大标签长度, 将标签编码序列的长度限制在最大标签长度内
if len(label_encoded) >= max_label_len:
label_encoded = label_encoded[:max_label_len]
# 如果标签长度小于最大标签长度, 将标签编码序列进行填充,以确保其长度与max_label_len相等
else:
# 这里使用了tokenizer的pad_token_id属性作为填充元素
# print(f'tokenizer.pad_token_id-->{tokenizer.pad_token_id}')
label_encoded = label_encoded + [tokenizer.pad_token_id] * (max_label_len - len(label_encoded))

# 将编码后的标签添加到tokenized_output字典中的'mask_labels'列表中
tokenized_output['mask_labels'].append(label_encoded)
else:
# 如果不是训练模式,直接将文本内容进行修剪并使用
content = example.strip()

# 初始化输入字典,用于准备文本数据和特殊标记
inputs_dict = {
'textA': content, # 'textA' 键对应的是后续处理的主要文本内容
'MASK': '[MASK]' # 'MASK' 键用于标识特殊的掩码标记,常用于语言模型中
}
# print(f'inputs_dict-->{inputs_dict}')

# 使用硬模板编码方法处理输入数据
# 该方法将输入数据字典、tokenizer、最大序列长度和最大标签长度作为参数
# 目的是将输入数据编码成模型所需的格式
encoded_inputs = hard_template(
inputs_dict=inputs_dict,
tokenizer=tokenizer,
max_seq_len=max_seq_len,
mask_length=max_label_len)
# print(f'encoded_inputs-->{encoded_inputs}')

# 将编码后的输入ID添加到输出字典中的input_ids列表
tokenized_output['input_ids'].append(encoded_inputs["input_ids"])
# 将编码后的token类型ID添加到输出字典中的token_type_ids列表
tokenized_output['token_type_ids'].append(encoded_inputs["token_type_ids"])
# 将编码后的注意力掩码添加到输出字典中的attention_mask列表
tokenized_output['attention_mask'].append(encoded_inputs["attention_mask"])
# 将遮罩位置信息添加到输出字典中的mask_positions列表
tokenized_output['mask_positions'].append(encoded_inputs["mask_position"])
# print(f'tokenized_output-->{tokenized_output}')

# 遍历tokenized_output字典,其中k是键,v是值
for k, v in tokenized_output.items():
# 如果return_tensor为True,将值转换为torch.LongTensor类型
if return_tensor:
tokenized_output[k] = torch.LongTensor(v)
# 否则,将值转换为numpy数组
else:
tokenized_output[k] = np.array(v)

return tokenized_output


if __name__ == '__main__':
# 创建ProjectConfig对象以获取项目配置
pc = ProjectConfig()

# 使用预训练模型的分词器进行初始化
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)

# 定义一个硬模板,用于将特定的文本结构化到模型输入中
# {MASK}用于指示模型需要预测的位置,{textA}是输入文本的占位符
hard_template = HardTemplate(prompt='这是一条{MASK}评论:{textA}')

# 定义示例输入,包含需要处理的文本数据
# 每个元素是一个包含类别和评论文本的字符串,用制表符分隔
examples = {"text": ['手机 这个手机也太卡了。', '体育 世界杯为何迟迟不见宣传']}
tokenized_output = convert_example(examples, tokenizer, max_seq_len=30, max_label_len=2, hard_template=hard_template)
print(f'tokenized_output-->{tokenized_output}')

print('*' * 80)

# 使用functools.partial函数创建一个部分应用函数convert_func
# 此函数基于convert_example函数,预先设置了一些参数,以便于后续的调用中简化操作
# 这样做是为了优化样本处理流程,将频繁使用的参数固定下来,提高代码复用性和灵活性
convert_func = partial(convert_example,
tokenizer=tokenizer,
hard_template=hard_template,
max_seq_len=30,
max_label_len=2)
# 加载训练数据集
# 使用ProjectConfig中定义的训练数据路径
train_dataset = load_dataset('text', data_files=pc.train_path)
print(type(train_dataset))
print(f'train_dataset-->{train_dataset}')
# print(train_dataset['train'])
# print(train_dataset['train']['text'])

# 使用map方法对训练数据集进行批量转换
# batched=True相当于将train_dataset看成一个批次的样本直接对数据进行处理,节省时间
dataset = train_dataset.map(convert_func, batched=True)
print(f'dataset-->{dataset}')

# 遍历数据集中的训练数据部分
for value in dataset['train']:
# 打印当前训练数据示例
print(value)
# 打印输入ID序列的长度
print(len(value['input_ids']))
# 打印输入ID序列的数据类型
print(type(value['input_ids']))
# 仅打印第一个训练数据示例后跳出循环
break

3.3 data_loader.py

  • 目的:定义数据加载器
  • 定义获取数据加载器的方法get_data(),代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from functools import partial
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, default_data_collator

from prompt_tasks.PET.data_handle.data_preprocess import convert_example
from prompt_tasks.PET.data_handle.template import HardTemplate
from prompt_tasks.PET.pet_config import ProjectConfig

# 实例化项目配置文件
pc = ProjectConfig()

# 使用项目配置文件中指定的预训练模型,初始化一个自动分词器
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)

def get_data():
'''
加载训练和验证数据集,并进行预处理以适应模型训练。
该函数首先读取提示模板文件,然后使用该模板创建一个硬模板对象。
接着,它加载原始数据集,并将其转换为适合模型训练的格式。
最后,它将处理后的数据集包装在DataLoader对象中,以便在训练过程中方便地访问数据。
:return: train_dataloader: 训练数据集的DataLoader对象。
dev_dataloader: 验证数据集的DataLoader对象。
'''
# 读取提示模板文件的第一行作为prompt
prompt = open(pc.prompt_file, 'r', encoding='utf-8').readlines()[0].strip() # prompt定义
# print(f'prompt-->{prompt}')

# 使用读取的prompt创建一个硬模板对象
hard_template = HardTemplate(prompt=prompt)

# 创建一个新函数,用于将示例转换为模型训练所需的格式
new_func = partial(convert_example,
tokenizer=tokenizer,
hard_template=hard_template,
max_seq_len=pc.max_seq_len,
max_label_len=pc.max_label_len)

# 加载原始文本数据集
dataset = load_dataset('text',
data_files={'train': pc.train_path, 'dev': pc.dev_path})
# print(f'dataset-->{dataset}')

# 使用新函数对数据集进行映射,进行批量处理
dataset = dataset.map(new_func, batched=True)
# print(f'dataset改变之后的-->{dataset}')

# 提取训练数据集和验证数据集
train_dataset = dataset["train"]
# print(f'train_dataset-->{train_dataset}')
# print(f'train_dataset[0]-->{train_dataset[0]}')
dev_dataset = dataset["dev"]
# print(f'dev_dataset-->{dev_dataset}')
# print('dev_dataset', dev_dataset[0])

# 使用default_data_collator将数据转换为tensor数据类型
train_dataloader = DataLoader(train_dataset,
shuffle=True,
collate_fn=default_data_collator,
batch_size=pc.batch_size)
# print(f'train_dataloader-->{train_dataloader}')
dev_dataloader = DataLoader(dev_dataset,
collate_fn=default_data_collator,
batch_size=pc.batch_size)

# 返回处理后的训练和验证数据集的DataLoader对象
return train_dataloader, dev_dataloader


if __name__ == '__main__':
# 获取训练和验证数据集的加载器
train_dataloader, dev_dataloader = get_data()
print(len(train_dataloader))
print(len(dev_dataloader))

# 遍历训练数据集加载器
for i, value in enumerate(train_dataloader):
print(f'i--->{i}')
print(f'value--->{value}')
# 打印当前数据项中'input_ids'的数据类型
print(value['input_ids'].dtype)
break

四、PET方式模型搭建与训练【实现】

  • 本项目中完成BERT+PET模型搭建、训练及应用的步骤如下(注意:因为本项目中使用的是BERT预训练模型,所以直接加载即可,无需重复搭建模型架构):
    • 1.实现模型工具类函数
    • 2.实现模型训练函数,验证函数
    • 3.实现模型预测函数

1、实现模型工具类函数

  • 目的:模型在训练、验证、预测时需要的函数
  • 代码路径:llm_tuning/prompt_tasks/PET/utils
    • utils文件夹共包含3个py脚本:verbalizer.py、metirc_utils.py以及common_utils.py

1.1 verbalizer.py

  • 目的:定义一个Verbalizer类,用于将一个主标签映射到子标签或者将子标签映射到主标签。
  • 思路:

阿甘的图片丢了

  • 具体实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# Union 是 typing 模块中定义的一个类,用于表示多个类型中的任意一种类型
from typing import Union, List
from transformers import AutoTokenizer

from prompt_tasks.PET.pet_config import ProjectConfig

pc = ProjectConfig()


# Verbalizer类,用于将一个Label对应到其子Label的映射。
class Verbalizer(object):
def __init__(self,
verbalizer_file: str,
tokenizer, max_label_len: int
):
'''
:param verbalizer_file: verbalizer文件存放地址。
:param tokenizer: 用于文本和id之间的转换。
:param max_label_len: 标签长度,若大于则截断,若小于则补齐
'''
self.tokenizer = tokenizer
self.label_dict = self.load_label_dict(verbalizer_file)
self.max_label_len = max_label_len

def load_label_dict(self, verbalizer_file: str):
'''
读取本地文件,构建verbalizer字典。
:param verbalizer_file: verbalizer文件存放地址。
:return:
dict -> {
'体育': ['篮球', '足球','网球', '排球', ...],
'酒店': ['宾馆', '旅馆', '旅店', '酒店', ...],
...
}
'''
# 初始化一个空字典,用于存储标签和子标签的关系
label_dict = {}

# 打开verbalizer文件,以只读模式,使用utf8编码
with open(verbalizer_file, 'r', encoding='utf-8') as f:
# 读取文件的每一行
for line in f:
# 移除行尾的换行符,并按制表符('\t')分割标签和子标签
label, sub_labels = line.strip().split('\t')
# 将子标签按逗号(,)分割成列表,使用set去重后再转回列表,存储到label_dict中
label_dict[label] = list(set(sub_labels.split(',')))
# 返回处理后的标签和子标签的字典
return label_dict

def find_sub_labels(self, label: Union[list, str]):
'''
通过主标签找到所有的子标签。
:param label: 标签, 文本型 或 id_list, e.g. -> '体育' or [860, 5509]
:return:
dict -> {
'sub_labels': ['足球', '网球'],
'token_ids': [[6639, 4413], [5381, 4413]]
}
'''
# 如果传入的label为id列表,则通过tokenizer转换回字符串
if type(label) == list:
# 移除label中的pad_token_id,直到label中不再包含它
while self.tokenizer.pad_token_id in label:
label.remove(self.tokenizer.pad_token_id)
# 将处理后的id列表转换为tokens,并拼接成字符串
label = ''.join(self.tokenizer.convert_ids_to_tokens(label))
# print(f'label-->{label}')
# 检查转换后的label是否在标签字典中,如果不在则抛出异常
if label not in self.label_dict:
raise ValueError(f'Lable Error: "{label}" 不在 label_dict {list(self.label_dict)}.')

# 从标签字典中获取与label对应的子标签
sub_labels = self.label_dict[label]
# print(f'sub_labels-->{sub_labels}')
# 将子标签作为结果的一个部分存储在字典中
ret = {'sub_labels': sub_labels}

# 对每个子标签进行token化,不含特殊符号
token_ids = [token_id for token_id in self.tokenizer(sub_labels, add_special_tokens=False)['input_ids']]
# print(f'token_ids-->{token_ids}')
# 遍历所有的token_ids,进行截断与补齐操作
for i in range(len(token_ids)):
# 对标签进行截断
token_ids[i] = token_ids[i][:self.max_label_len]
# 如果长度不足max_label_len,则使用pad_token_id进行补齐
if len(token_ids[i]) < self.max_label_len:
token_ids[i] = token_ids[i] + [self.tokenizer.pad_token_id] * (self.max_label_len - len(token_ids[i]))
# 将处理后的token_ids存入ret字典中
ret['token_ids'] = token_ids
return ret

def batch_find_sub_labels(self, label: List[Union[list, str]]):
'''
批量找到子标签。
:param label: 标签列表, [[4510, 5554], [860, 5509]] or ['体育', '电脑']
:return:
list -> [
{
'sub_labels': ['笔记本', '电脑'],
'token_ids': [[5011, 6381, 3315], [4510, 5554]]
},
...
]
'''
return [self.find_sub_labels(l) for l in label]

def get_common_sub_str(self,
str1: str,
str2: str
):
'''
寻找最大公共子串(连续子序列)。
:param str1: abcd
:param str2: abadbcdba
:return:
'''
# 初始化两个字符串的长度
lstr1, lstr2 = len(str1), len(str2)
# 生成0矩阵,为方便后续计算,比字符串长度多了一列,生成一个 lstr1+1 * lstr2+1 的二维矩阵
record = [[0 for i in range(lstr2 + 1)] for j in range(lstr1 + 1)]
# 初始化最长匹配对应在str1中的最后一位
p = 0
# 初始化最长匹配长度
maxNum = 0
# 遍历两个字符串,寻找最长公共子串
for i in range(1, lstr1 + 1):
for j in range(1, lstr2 + 1):
# 当发现相同字符时
if str1[i - 1] == str2[j - 1]:
# 在record矩阵中记录匹配长度
record[i][j] = record[i - 1][j - 1] + 1
# 更新最长匹配长度和对应在str1中的最后一位
if record[i][j] > maxNum:
maxNum = record[i][j]
p = i

# 返回最长公共子串和其长度
return str1[p - maxNum:p], maxNum

def hard_mapping(self, sub_label: str):
'''
强匹配函数,当模型生成的子label不存在时,通过最大公共子串找到重合度最高的主label。
:param sub_label: 子label
:return: 主label
'''
# 初始化变量label和max_overlap_str,用于记录最大重叠度的标签和对应的重叠度值
label, max_overlap_str = '', 0

# 遍历标签字典,其中main_label是主标签,sub_labels是与主标签相关的子标签列表
for main_label, sub_labels in self.label_dict.items():
overlap_num = 0
# 对于每个子标签,计算它与当前推理标签之间的最长公共子串长度总和
for s_label in sub_labels:
# 累加每个子标签与当前推理标签之间的最长公共子串长度
overlap_num += self.get_common_sub_str(sub_label, s_label)[1]

# 如果当前的重叠度大于或等于之前的最大重叠度,则更新最大重叠度和对应的标签
if overlap_num >= max_overlap_str:
max_overlap_str = overlap_num
label = main_label

return label

def find_main_label(self,
sub_label: Union[list, str],
hard_mapping=True
):
'''
通过子标签找到父标签。
:param sub_label: 子标签, 文本型 或 id_list, e.g. -> '苹果' or [5741, 3362]
:param hard_mapping: 当生成的词语不存在时,是否一定要匹配到一个最相似的label。
:return:
dict -> {
'label': '水果',
'token_ids': [3717, 3362]
}
'''
# 如果传入的sub_label为id列表,则通过tokenizer转换回字符串
if type(sub_label) == list:
pad_token_id = self.tokenizer.pad_token_id
# 移除列表中的[PAD]token,避免影响后续处理
while pad_token_id in sub_label:
sub_label.remove(pad_token_id)
# 将id列表转换为对应的字符串
sub_label = ''.join(self.tokenizer.convert_ids_to_tokens(sub_label))
# print(f'sub_label-->{sub_label}')
# 初始化主标签为'无',作为未找到特定子标签时的默认值
main_label = '无'

# 遍历标签字典,寻找与子标签匹配的主标签
for label, sub_labels in self.label_dict.items():
# 检查当前子标签是否在字典中对应的子标签列表中
if sub_label in sub_labels:
# 当找到匹配时,更新主标签并终止循环
main_label = label
break
# print(f'main_label-->{main_label}')
# 如果主标签为'无'且启用了强匹配功能,则使用强匹配方法更新主标签
if main_label == '无' and hard_mapping:
main_label = self.hard_mapping(sub_label)
# print('强匹配', main_label)
ret = {
'label': main_label,
'token_ids': self.tokenizer(main_label, add_special_tokens=False)['input_ids']
}
return ret

def batch_find_main_label(self,
sub_label: List[Union[list, str]],
hard_mapping=True
):
'''
批量通过子标签找父标签。
:param sub_label: 子标签列表, ['苹果', ...] or [[5741, 3362], ...]
:param hard_mapping:
:return:
list: [
{
'label': '水果',
'token_ids': [3717, 3362]
},
...
]
'''
return [self.find_main_label(l, hard_mapping) for l in sub_label]


if __name__ == '__main__':
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)
verbalizer = Verbalizer(
verbalizer_file=pc.verbalizer,
tokenizer=tokenizer,
max_label_len=2)
print(f'label_dict-->{verbalizer.label_dict}')

# 查找单个子标签
label = '电脑'
ret = verbalizer.find_sub_labels(label)
print(f'ret-->{ret}')
print('*' * 80)

# 查找多个子标签
labels = ['电脑', '衣服']
# labels = [[4510, 5554], [6132, 3302]]
result = verbalizer.batch_find_sub_labels(labels)
print(f'result-->{result}')
print('*' * 80)

# 查找单个子标签对应的父标签
# sub_label = [4510, 5554]
sub_label = '衣电'
ret = verbalizer.find_main_label(sub_label)
print(f'ret-->{ret}')
print('*' * 80)

# 查找多个子标签对应的父标签
# sub_label = ['衣服', '牛奶']
sub_label = [[6132, 3302], [5885, 4281]]
ret = verbalizer.batch_find_main_label(sub_label, hard_mapping=True)
print(f'ret-->{ret}')

1.2 common_utils.py

  • 目的:定义损失函数、将mask_position位置的token logits转换为token的id。
==损失计算思路==:

image-20250823173343428

将logits获取id的思路:

image-20250823181627322

  • 脚本里面包含两个函数:mlm_loss()以及convert_logits_to_ids()
  • 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import torch

def mlm_loss(logits,
mask_positions,
sub_mask_labels,
cross_entropy_criterion,
device):
'''
计算指定位置的mask token的output与label之间的cross entropy loss。
:param logits: (torch.tensor): 模型原始输出 -> (batch_size, seq_len, vocab_size)
:param mask_positions: (torch.tensor): mask token的位置 -> (batch_size, mask_label_num)
:param sub_mask_labels: (list): mask token的sub label, 由于每个label的sub_label数目不同,所以这里是个变长的list,
e.g. -> [
[[2398, 3352]],
[[2398, 3352], [3819, 3861]]
]
:param cross_entropy_criterion: (CrossEntropyLoss): CE Loss计算器
:param device: (str): cpu还是gpu
:return: CE Loss
'''
'''
获取logits的尺寸信息,为后续计算做准备
logits.size()返回一个包含三个维度的元组
第一个维度(batch_size)代表批次大小,即一次处理的数据批次包含的样本数量
第二个维度(seq_len)代表序列长度,即每个样本中包含的序列元素数量
第三个维度(vocab_size)代表词汇表大小,即每个序列元素可能的类别数量
'''
batch_size, seq_len, vocab_size = logits.size()
# print(f'模型预测结果logits-->{logits.size()}')
# print(f'mask_positions-->{mask_positions.shape}')
# print(f'sub_mask_labels-->{sub_mask_labels}')

# 初始化loss变量为None,用于后续可能的损失计算
loss = None
# 遍历 logits、sub_mask_labels 和 mask_positions 的元素
for single_value in zip(logits, sub_mask_labels, mask_positions):
# 获取当前token的 logits
single_logits = single_value[0]
# print(f'single_logits-->{single_logits.shape}') # 形状[512, 21128]
# 获取当前token的 sub_mask_labels
single_sub_mask_labels = single_value[1]
# print(f'single_sub_mask_labels-->{single_sub_mask_labels}')
# 获取当前token的 mask_positions
single_mask_positions = single_value[2]
# print(f'single_mask_positions-->{single_mask_positions}') # 形状size[2]-->具体值([5, 6])

# 从单个序列的logits中,提取出被掩码位置的logits
single_mask_logits = single_logits[single_mask_positions] # (mask_label_num, vocab_size)
# 打印被掩码位置logits的形状,以验证其是否符合预期
# print(f'single_mask_logits-->{single_mask_logits.shape}')

# 模型训练时主标签对应的所有子标签都有相似的特征值, 在计算CE Loss时,需要将每个子标签的对应的损失求平均,因此需要将预测的概率值进行扩展
# 对单个 single_mask_logits 进行扩展,使其在第一个维度上重复,以匹配 single_sub_mask_labels 的数量
# 使用repeat设置重复的倍数 (sub_label_num, mask_label_num, vocab_size)
single_mask_logits = single_mask_logits.repeat(len(single_sub_mask_labels), 1, 1)
# 打印重复后的single_mask_logits的形状,以便调试和验证重复操作的效果
# print(f'重复后的single_mask_logits-->{single_mask_logits.shape}')

# 将三维张量调整为二维,以便计算损失
single_mask_logits = single_mask_logits.reshape(-1, vocab_size) # (sub_label_num * mask_label_num, vocab_size)
# print(f'调整成二维后的single_mask_logits-->{single_mask_logits.shape}')

# 将子标签转换为张量,并调整形状以匹配模型预测的结果
single_sub_mask_labels = torch.LongTensor(single_sub_mask_labels).to(device) # (sub_label_num, mask_label_num)
# 计算损失值时真实子标签维度为1维,因此需要将其展平以匹配模型预测的结果
single_sub_mask_labels = single_sub_mask_labels.reshape(-1, 1).squeeze() # (sub_label_num * mask_label_num)
# print(f'真实子标签mask值:single_sub_mask_labels-->{single_sub_mask_labels.shape}')
# print(f'真实子标签mask值:single_sub_mask_labels-->{single_sub_mask_labels}')

# 计算当前批次所有子标签的损失
cur_loss = cross_entropy_criterion(single_mask_logits, single_sub_mask_labels)
# 计算当前批次所有子标签的平均损失
cur_loss = cur_loss / len(single_sub_mask_labels)

# 如果当前损失loss未被初始化(即为None),则将其设置为当前批次的损失cur_loss
if not loss:
loss = cur_loss
# 如果当前损失loss已经存在,则将当前批次的损失cur_loss累加到loss中
else:
loss += cur_loss

# 计算平均损失:将累计的损失loss除以批次大小batch_size
loss = loss / batch_size # (1,)
return loss

def convert_logits_to_ids(
logits: torch.tensor,
mask_positions: torch.tensor):
'''
输入Language Model的词表概率分布(LMModel的logits),将mask_position位置的token logits转换为token的id。
:param logits: (torch.tensor): model output -> (batch, seq_len, vocab_size) [8, 512, 21128]
:param mask_positions: (torch.tensor): mask token的位置 -> (batch, mask_label_num) [8, 2]
:return: 对应mask position上最大概率的推理token -> (batch, mask_label_num) [8, 2]
'''
# 获取标签的长度,mask_positions.size()返回的是一个包含维度的元组,[1]表示获取第二个维度的大小
label_length = mask_positions.size()[1]
# print(f'label_length-->{label_length}')

# 获取批次大小、序列长度和词汇表大小,logits.size()返回的是一个包含维度的元组
batch_size, seq_len, vocab_size = logits.size()

# 初始化一个空列表,用于存储重塑后的 mask_positions
mask_positions_after_reshaped = []

# print(f'mask_positions.detach().cpu().numpy().tolist()-->{mask_positions.detach().cpu().numpy().tolist()}')
# 遍历每个批次的mask_positions
for batch, mask_pos in enumerate(mask_positions.detach().cpu().numpy().tolist()):
# 遍历每个mask位置
for pos in mask_pos:
# 将批次号和序列中的mask位置结合起来,得到重塑后的mask_positions
mask_positions_after_reshaped.append(batch * seq_len + pos)
# print(f'mask_positions_after_reshaped-->{mask_positions_after_reshaped}')
# print(f'原始的logits-->{logits.shape}')

# 将原始的logits重塑为(batch_size * seq_len, vocab_size)的形状
logits = logits.reshape(batch_size * seq_len, -1) # (batch_size * seq_len, vocab_size)
# print(f'改变原始模型输出的结果形状-->{logits.shape}')

# 从重塑后的logits中,选择出被掩码位置的logits
mask_logits = logits[mask_positions_after_reshaped]
# print(f'被掩码位置的logits-->{mask_logits.shape}')

# 获取每个样本mask位置所预测的tokens
predict_tokens = mask_logits.argmax(dim=-1) # (batch * label_num)
# print(f'获取每个样本mask位置预测的tokens', predict_tokens)

# 将每个样本mask位置预测的tokens重塑为(batch, label_num)的形状
predict_tokens = predict_tokens.reshape(-1, label_length) # (batch, label_num)
# print(f'predict_tokens-->{predict_tokens}')

return predict_tokens

1.3 metirc_utils.py

  • 目的:定义(多)分类问题下的指标评估(acc, precision, recall, f1)。
  • 定义ClassEvaluator类,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from typing import List
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, f1_score
from sklearn.metrics import recall_score, confusion_matrix


class ClassEvaluator(object):
def __init__(self):
# 初始化真实结果和预测结果的列表
self.goldens = [] # 存储真实结果数据
self.predictions = [] # 存储预测结果数据

def add_batch(self,
pred_batch: List[List],
gold_batch: List[List]):
'''
添加一个batch中的prediction和gold列表,用于后续统一计算。
:param pred_batch: (list): 模型预测标签列表, e.g. -> [['体', '育'], ['财', '经'], ...]
:param gold_batch: (list): 真实标签标签列表, e.g. -> [['体', '育'], ['财', '经'], ...]
:return:
'''
# 确保预测批次和真实批次长度一致,这是后续处理的前提条件
assert len(pred_batch) == len(gold_batch)
# print(f'pred_batch0-->{pred_batch}')
# print(f'gold_batch0-->{gold_batch}')

# 若遇到多个子标签构成一个标签的情况
# 判断gold_batch的第一个元素是否为列表或元组类型
if type(gold_batch[0]) in [list, tuple]:
# 如果是,则将pred_batch中的每个元素转换为字符串后拼接起来
pred_batch = [''.join([str(e) for e in ele]) for ele in pred_batch]
# 同样地,也将gold_batch中的每个元素转换为字符串后拼接起来
gold_batch = [''.join([str(e) for e in ele]) for ele in gold_batch]
# print(f'pred_batch-->{pred_batch}')
# print(f'gold_batch-->{gold_batch}')

# 将真实结果的批次数据添加到self.goldens列表中
self.goldens.extend(gold_batch)
# print(f'self.goldens-->{self.goldens}')
# 将预测结果的批次数据添加到self.predictions列表中
self.predictions.extend(pred_batch)
# print(f'self.predictions-->{self.predictions}')

def compute(self, round_num=2) -> dict:
'''
根据当前类中累积的变量值,计算当前的P, R, F1。
:param round_num: (int): 计算结果保留小数点后几位, 默认小数点后2位。
:return:
dict -> {
'accuracy': 准确率,
'precision': 精准率,
'recall': 召回率,
'f1': f1值,
'class_metrics': {
'0': {
'precision': 该类别下的precision,
'recall': 该类别下的recall,
'f1': 该类别下的f1
},
...
}
}
'''
# print(f'self.goldens-->{self.goldens}')
# print(f'self.predictions-->{self.predictions}')
# 初始化类别集合、类别指标字典和结果字典,用于存储全局指标
# 将 self.goldens 和 self.predictions 的集合合并,并进行排序,结果存储在变量 classes 中。
classes = sorted(list(set(self.goldens) | set(self.predictions)))
class_metrics = {}
res = {}
# print(f'classes-->{classes}')

# 构建全局指标
# 计算并存储全局准确率
res['accuracy'] = round(accuracy_score(self.goldens, self.predictions), round_num)
# 计算并存储全局精确率
res['precision'] = round(precision_score(self.goldens, self.predictions, average='weighted'), round_num)
# 计算并存储全局召回率
res['recall'] = round(recall_score(self.goldens, self.predictions, average='weighted'), round_num)
# 计算并存储全局F1分数
res['f1'] = round(f1_score(self.goldens, self.predictions, average='weighted'), round_num)
# print(f'res-->{res}')

try:
# 计算混淆矩阵,并将其转换为numpy数组,形状为(n_class, n_class)
conf_matrix = np.array(confusion_matrix(self.goldens, self.predictions))
# print(f'conf_matrix-->{conf_matrix}')
# 确保混淆矩阵的维度与类别数量匹配
assert conf_matrix.shape[0] == len(classes)
# 遍历每个类别,计算精确度(precision)、召回率(recall)和F1分数(f1)
for i in range(conf_matrix.shape[0]):
# 计算当前类别的精确度
precision = 0 if sum(conf_matrix[:, i]) == 0 else (conf_matrix[i, i] / sum(conf_matrix[:, i]))
# 计算当前类别的召回率
recall = 0 if sum(conf_matrix[i, :]) == 0 else (conf_matrix[i, i] / sum(conf_matrix[i, :]))
# 计算当前类别的F1分数
f1 = 0 if (precision + recall) == 0 else (2 * precision * recall / (precision + recall))
# 将当前类别的精确度、召回率和F1分数保存到字典中
class_metrics[classes[i]] = {
'precision': round(precision, round_num),
'recall': round(recall, round_num),
'f1': round(f1, round_num)
}
# 将所有类别的指标保存到结果字典中
res['class_metrics'] = class_metrics
except Exception as e:
# 异常处理:当计算类别指标时发生异常,打印警告信息和相关数据
print(f'[Warning] Something wrong when calculate class_metrics: {e}')
print(f'--> goldens: {set(self.goldens)}')
print(f'--> predictions: {set(self.predictions)}')
print(f'--> diff elements: {set(self.predictions) - set(self.goldens)}')
# 将结果字典中的类别指标设置为空字典
res['class_metrics'] = {}

return res

def reset(self):
"""
重置积累的数值。
"""
self.goldens = []
self.predictions = []


if __name__ == '__main__':
metric = ClassEvaluator()
metric.add_batch(
[['财', '经'], ['财', '经'], ['体', '育'], ['体', '育'], ['计', '算', '机']],
[['体', '育'], ['财', '经'], ['体', '育'], ['计', '算', '机'], ['计', '算', '机']],
)
# metric.add_batch(
# [0, 0, 1, 1, 0],
# [1, 1, 1, 0, 0]
# )
res = metric.compute()
print(res)

2、实现模型训练函数,验证函数

  • 目的:实现模型的训练和验证

  • 脚本里面包含两个函数:model2train()和evaluate_model()

  • 代码路径:llm_tuning/prompt_tasks/PET/train.py

    代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import os
import time

import torch
from tqdm import tqdm
from transformers import AutoModelForMaskedLM, AutoTokenizer, get_scheduler

from prompt_tasks.PET.data_handle.data_loader import get_data
from prompt_tasks.PET.pet_config import ProjectConfig
from prompt_tasks.PET.utils.common_utils import mlm_loss, convert_logits_to_ids
from prompt_tasks.PET.utils.metirc_utils import ClassEvaluator
from prompt_tasks.PET.utils.verbalizer import Verbalizer

pc = ProjectConfig()


def model2train():
# 加载训练数据和验证数据
train_dataloader, dev_dataloader = get_data()

# 加载预训练模型
model = AutoModelForMaskedLM.from_pretrained(pc.pre_model).to(pc.device)
# print(f'预训练模型带MLM头的-->{model}')
# 加载分词器
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)
# 加载映射词表
verbalizer = Verbalizer(verbalizer_file=pc.verbalizer,
tokenizer=tokenizer,
max_label_len=pc.max_label_len)
# print(f'verbalizer-->{verbalizer.label_dict}')

# 不需要权重衰减的参数
no_decay = ["bias", "LayerNorm.weight"]
# print(type(model.parameters()))
# 定义优化器的参数组,以便对模型的不同部分应用不同的权重衰减
optimizer_grouped_parameters = [
# 第一组参数:包含所有适用权重衰减的模型参数
{
"params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
"weight_decay": pc.weight_decay,
},
# 第二组参数:包含所有不适用权重衰减的模型参数
{
"params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
"weight_decay": 0.0,
},
]
# 初始化AdamW优化器,用于模型参数的优化
# AdamW是Adam算法的变体,加入了权重衰减(L2正则化),有助于防止过拟合
# 参数optimizer_grouped_parameters是分组的模型参数,允许对不同的参数应用不同的学习率或正则化强度
optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=pc.learning_rate)

# 根据训练轮数计算最大训练步数,以便于scheduler动态调整lr
num_update_steps_per_epoch = len(train_dataloader)
# 指定总的训练步数,它会被学习率调度器用来确定学习率的变化规律,确保学习率在整个训练过程中得以合理地调节
max_train_steps = pc.epochs * num_update_steps_per_epoch
# 计算预热阶段的训练步数,用于初始化学习率调度
warm_steps = int(pc.warmup_ratio * max_train_steps) # 预热阶段的训练步数
# 创建学习率调度器,使用线性调度策略,根据训练的进行逐步调整学习率
lr_scheduler = get_scheduler(
name='linear',
optimizer=optimizer,
num_warmup_steps=warm_steps,
num_training_steps=max_train_steps)

# 初始化损失列表,用于记录训练过程中的损失值
loss_list = []
# 记录训练开始的时间,用于计算训练时长
tic_train = time.time()
# 创建分类评估器,用于评估模型性能
metric = ClassEvaluator()
# 定义损失函数,用于计算模型预测值与真实标签之间的差异
criterion = torch.nn.CrossEntropyLoss()
# 初始化训练次数和最佳F1分数,用于跟踪训练进度和模型性能
global_step, best_f1 = 0, 0

print('开始训练:')
for epoch in range(pc.epochs):
for batch in tqdm(train_dataloader, desc='模型训练'):
# print(f'batch-->{batch}')
# 将批次数据输入模型,获取logits
logits = model(input_ids=batch['input_ids'].to(pc.device),
token_type_ids=batch['token_type_ids'].to(pc.device),
attention_mask=batch['attention_mask'].to(pc.device)).logits
# print(f'logits->{logits.shape}')

# 真实标签
mask_labels = batch['mask_labels'].numpy().tolist()
# print(f'mask_labels--->{mask_labels}')
# 提取子标签
sub_labels = verbalizer.batch_find_sub_labels(mask_labels)
# print(f'sub_labels--->{sub_labels}')
# 获取子标签的token_ids
sub_labels = [ele['token_ids'] for ele in sub_labels]
# print(f'sub_labels_token_ids--->{sub_labels}')

# 计算掩码语言模型的损失值
loss = mlm_loss(logits,
batch['mask_positions'].to(pc.device),
sub_labels,
criterion,
pc.device)
# print(f'计算损失值-->{loss}')
# 清零优化器的梯度
optimizer.zero_grad()
# 反向传播计算梯度
loss.backward()
# 更新模型参数
optimizer.step()
# 更新学习率调度器
lr_scheduler.step()

# 将损失值添加到损失列表中
loss_list.append(loss)
# 训练次数增加1
global_step += 1
# 打印训练日志
if global_step % pc.logging_steps == 0:
time_diff = time.time() - tic_train
loss_avg = sum(loss_list) / len(loss_list)
print("global step %d, epoch: %d, loss: %.5f, speed: %.2f step/s"
% (global_step, epoch, loss_avg, pc.logging_steps / time_diff))
tic_train = time.time()
# 模型验证
# 使用给定的模型、评估指标、数据加载器、分词器和标记化器进行模型评估
acc, precision, recall, f1, class_metrics = evaluate_model(model,
metric,
dev_dataloader,
tokenizer,
verbalizer)

# 打印评估结果中的精确度、召回率和F1分数
print("验证集的 precision: %.5f, recall: %.5f, F1: %.5f" % (precision, recall, f1))
# 如果当前F1分数高于最佳F1分数,则更新最佳F1分数和相关模型及分词器
if f1 > best_f1:
print(
f"最好的f1分数被更新: {best_f1:.5f} --> {f1:.5f}"
)
print(f'每种类型的Metrics为: {class_metrics}')
# 更新当前最佳的F1分数
best_f1 = f1
# 定义当前保存模型和分词器的目录
cur_save_dir = os.path.join(pc.save_dir, "model_best")
print(cur_save_dir)
# 检查并创建保存目录(如果不存在)
if not os.path.exists(cur_save_dir):
os.makedirs(cur_save_dir)
# 保存模型到指定目录
model.save_pretrained(cur_save_dir)
# 保存分词器到指定目录
tokenizer.save_pretrained(cur_save_dir)
tic_train = time.time()

print('训练结束')


def evaluate_model(model,
metric,
data_loader,
tokenizer,
verbalizer):
'''
在测试集上评估当前模型的训练效果。
:param model: 当前模型
:param metric: 评估指标类(metric)
:param data_loader: 测试集的dataloader
:param tokenizer: 分词器
:param verbalizer: 映射表
:return:
'''
model.eval()
metric.reset()

with torch.no_grad():
for step, batch in enumerate(tqdm(data_loader, desc='模型验证')):
# print(f'batch-->{batch}')
logits = model(input_ids=batch['input_ids'].to(pc.device),
token_type_ids=batch['token_type_ids'].to(pc.device),
attention_mask=batch['attention_mask'].to(pc.device)).logits
# print(f'验证集模型预测的结果————>{logits.shape}')

mask_labels = batch['mask_labels'].numpy().tolist() # (batch, label_num)
# print(f"mask_labels-0-->{mask_labels}")

for i in range(len(mask_labels)): # 去掉label中的[PAD] token
while tokenizer.pad_token_id in mask_labels[i]:
mask_labels[i].remove(tokenizer.pad_token_id)
# print(f'mask_labels-1-->{mask_labels}')
# 将mask_labels id转换为文字
mask_labels = [''.join(tokenizer.convert_ids_to_tokens(t)) for t in mask_labels]
# print(f'真实的结果主标签:mask_labels_str-->{mask_labels}')

# 获取模型预测的子标签
predictions = convert_logits_to_ids(logits,
batch['mask_positions']).cpu().numpy().tolist() # (batch, label_num)
# print(f'模型预测的子标签的结果-->{predictions}')

# 根据模型预测的子标签,找到子label属于的主label
predictions = verbalizer.batch_find_main_label(predictions) # 找到子label属于的主label
# print(f"找到模型预测的子标签对应的主标签的结果-->{predictions}')")

# 获得预测的主标签名
predictions = [ele['label'] for ele in predictions]
# print(f"只获得预测的主标签的结果string-->{predictions}')")

# 调用add_batch方法, 将模型预测的主标签与真实主标签保存到metric属性中
metric.add_batch(pred_batch=predictions, gold_batch=mask_labels)
eval_metric = metric.compute()
model.train()

return eval_metric['accuracy'], eval_metric['precision'], \
eval_metric['recall'], eval_metric['f1'], \
eval_metric['class_metrics']


if __name__ == '__main__':
model2train()

  • 输出结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
.....
global step 40, epoch: 4, loss: 0.62105, speed: 1.27 step/s
Evaluation precision: 0.78000, recall: 0.77000, F1: 0.76000
Each Class Metrics are: {'书籍': {'precision': 0.97, 'recall': 0.82, 'f1':
0.89}, '平板': {'precision': 0.57, 'recall': 0.84, 'f1': 0.68}, '手机':
{'precision': 0.0, 'recall': 0.0, 'f1': 0}, '水果': {'precision': 0.95,
'recall': 0.81, 'f1': 0.87}, '洗浴': {'precision': 0.7, 'recall': 0.71, 'f1':
0.7}, '电器': {'precision': 0.0, 'recall': 0.0, 'f1': 0}, '电脑': {'precision':
0.86, 'recall': 0.38, 'f1': 0.52}, '蒙牛': {'precision': 1.0, 'recall': 0.68,
'f1': 0.81}, '衣服': {'precision': 0.71, 'recall': 0.91, 'f1': 0.79}, '酒店':
{'precision': 1.0, 'recall': 0.88, 'f1': 0.93}}
global step 50, epoch: 6, loss: 0.50076, speed: 1.23 step/s
global step 60, epoch: 7, loss: 0.41744, speed: 1.23 step/s
...
global step 390, epoch: 48, loss: 0.06674, speed: 1.20 step/s
global step 400, epoch: 49, loss: 0.06507, speed: 1.21 step/s
Evaluation precision: 0.78000, recall: 0.76000, F1: 0.75000

  • 结论: BERT+PET模型在训练集上的表现是精确率=78%
  • 注意:本项目中只用了60条样本,在接近600条样本上精确率就已经达到了78%,如果想让指标更高,可以扩增样本。

3、实现模型预测函数

  • 目的:加载训练好的模型并测试效果

  • 代码路径:llm_tuning/prompt_tasks/PET/inference.py

    代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import os
import time
from typing import List
import torch
from transformers import AutoTokenizer, AutoModelForMaskedLM

from prompt_tasks.PET.data_handle.data_preprocess import convert_example
from prompt_tasks.PET.data_handle.template import HardTemplate
from prompt_tasks.PET.pet_config import ProjectConfig
from prompt_tasks.PET.utils.common_utils import convert_logits_to_ids
from prompt_tasks.PET.utils.verbalizer import Verbalizer

pc = ProjectConfig()

model_path = os.path.join(pc.save_dir, 'model_best')
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForMaskedLM.from_pretrained(model_path).to(pc.device)
model.eval()

max_label_len = 2 # 标签最大长度
verbalizer = Verbalizer(
verbalizer_file=pc.verbalizer,
tokenizer=tokenizer,
max_label_len=max_label_len)
prompt = open(pc.prompt_file, 'r', encoding='utf8').readlines()[0].strip() # prompt定义
print(f'提示词--> {prompt}')
hard_template = HardTemplate(prompt=prompt) # 模板转换器定义


def inference(contents: List[str]):
'''
推理函数,输入原始句子,输出mask label的预测值。
:param contents:
:return: 描原始句子列表。
'''
with (torch.no_grad()):
start_time = time.time()

# 将内容封装为示例字典,准备进行标记化处理
examples = {'text': contents}

# 对示例进行标记化处理,返回标记化输出
tokenized_output = convert_example(
examples,
tokenizer,
hard_template=hard_template,
max_seq_len=128,
max_label_len=max_label_len,
train_mode=False,
return_tensor=True)

# 使用模型进行预测,获取logits
logits = model(input_ids=tokenized_output['input_ids'].to(pc.device),
token_type_ids=tokenized_output['token_type_ids'].to(pc.device),
attention_mask=tokenized_output['attention_mask'].to(pc.device)).logits

# 将logits转换为预测标签
predictions = convert_logits_to_ids(logits, tokenized_output['mask_positions']
).cpu().numpy().tolist() # (batch, label_num)

# 找到子label属于的主label
predictions = verbalizer.batch_find_main_label(predictions)

# 提取预测的标签
predictions = [ele['label'] for ele in predictions]

used = time.time() - start_time
print(f'耗时 {used} 秒。')
return predictions


if __name__ == '__main__':
contents = [
'天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出行,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐',
'环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不错,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐',
"物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小",
"福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了,之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多来点其他小点,饼干一直也是大爱,那天好像也没看到",
"服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒适,晚上休息很安逸,隔音效果不错赞,下次还会来"
]
print("针对下面的文本评论,请分别给出对应所属类别:")
res = inference(contents)
print('推断的类别为:', res)
new_dict = {}
for i in range(len(contents)):
new_dict[contents[i]] = res[i]
print(f'new_dict-->{new_dict}')
  • 结果展示
1
2
3
4
5
6
7
8
9
10
11
12
13
{
'天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出
行,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐': '酒店',
'环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不
错,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐': '酒店',
'物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小': '平板',
'福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了
,之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做
柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多来
点其他小点,饼干一直也是大爱,那天好像也没看到': '水果',
'服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒
适,晚上休息很安逸,隔音效果不错赞,下次还会来': '酒店'
}

五、BERT+P-Tuning方式介绍【理解】

1、P-Tuning回顾

  • P-Tuning(Pattern-Tuning)是一种连续空间可学习模板,PET的目的解决PET的缺点,使用可学习的向量作为伪模板,不再手动构建模板。

以新闻分类任务为例:原始文本:中国女排再夺冠!P-Tuning可学习模板:[u1] [u2] …[MASK]…[un], Label:体育/财经/时政/军事

  • P-tuning 的核心思想是:用一个小的可训练模块把一组“连续提示向量”生成并插入到原始输入 embedding 中,令冻结的预训练模型在下游任务上产生正确输出,训练时仅更新 prompt encoder(或提示向量),从而实现低成本高效的调优。

2、环境准备

本项目基于 pytorch + transformers 实现,运行前请安装相关依赖包:


  • python==3.10
  • transformers==4.40.2
  • torch==2.5.1+cu121
  • datasets==3.6.0
  • scikit-learn==1.7.0

3、项目架构

  • 项目架构流程图:
  • 项目整体代码介绍:

image-20250821011025805

六、BERT+P-Tuning方式数据预处理【理解】

本项目中对数据部分的预处理步骤如下:

  • 1.查看项目数据集
  • 2.编写Config类项目文件配置代码
  • 3.编写数据处理相关代码

1、查看项目数据集

  • 数据存放位置:llm_tuning/prompt_tasks/P-Tuning/data

  • data文件夹里面包含3个txt文档,分别为:train.txt、dev.txt、verbalizer.txt


1.1 train.txt

  • train.txt为训练数据集,其部分数据展示如下:
1
2
3
4
5
6
水果	脆脆的,甜味可以,可能时间有点长了,水分不是很足。
平板 华为机器肯定不错,但第一次碰上京东最糟糕的服务,以后不想到京东购物了。
书籍 为什么不认真的检查一下, 发这么一本脏脏的书给顾客呢!
衣服 手感不错,用料也很好,不知道水洗后怎样,相信大品牌,质量过关,五星好评!!!
水果 苹果有点小,不过好吃,还有几个烂的。估计是故意的放的。差评。
衣服 掉色掉的厉害,洗一次就花了

train.txt一共包含63条样本数据,每一行用\t分开,前半部分为标签(label),后半部分为原始输入 (用户评论)。

如果想使用自定义数据训练,只需要仿照上述示例数据构建数据集即可。


1.2 dev.txt

  • dev.txt为验证数据集,其部分数据展示如下:
1
2
3
4
5
6
书籍	"一点都不好笑,很失望,内容也不是很实用"
衣服 完全是一条旧裤子。
手机 相机质量不错,如果阳光充足,可以和数码相机媲美.界面比较人性化,容易使用.软件安装简便
书籍 明明说有货,结果送货又没有了。并且也不告诉我,怎么评啊
洗浴 非常不满意,晚上洗的头发,第二天头痒痒的不行了,还都是头皮屑。
水果 这个苹果感觉是长熟的苹果,没有打蜡,不错,又甜又脆

dev.txt一共包含417条样本数据,每一行用\t分开,前半部分为标签(label),后半部分为原始输入 (用户评论)。

如果想使用自定义数据训练,只需要仿照上述示例数据构建数据集即可。

1.3 verbalizer.txt

  • verbalizer.txt 主要用于定义「真实标签」到「标签预测词」之间的映射。在有些情况下,将「真实标签」作为 [MASK] 去预测可能不具备很好的语义通顺性,因此,我们会对「真实标签」做一定的映射。

  • 例如:

1
"中国爆冷2-1战胜韩国"是一则[MASK][MASK]新闻。	体育
  • 这句话中的标签为「体育」,但如果我们将标签设置为「足球」会更容易预测。

  • 因此,我们可以对「体育」这个 label 构建许多个子标签,在推理时,只要预测到子标签最终推理出真实标签即可,如下:

1
体育 -> 足球,篮球,网球,棒球,乒乓,体育
  • 项目中标签词映射数据展示如下:
1
2
3
4
5
6
7
8
9
10
电脑	电脑
水果 水果
平板 平板
衣服 衣服
酒店 酒店
洗浴 洗浴
书籍 书籍
蒙牛 蒙牛
手机 手机
电器 电器

verbalizer.txt 一共包含10个类别,上述数据中,我们使用了1对1的verbalizer, 如果想定义一对多的映射,只需要在后面用”,”分割即可, eg:

1
水果	苹果,香蕉,橘子

若想使用自定义数据训练,只需要仿照示例数据构建数据集

2、编写Config类项目文件配置代码

  • 代码路径:llm_tuning/prompt_tasks/P-Tuning/ptune_config.py

  • config文件目的:配置项目常用变量,一般这些变量属于不经常改变的,比如:训练文件路径、模型训练次数、模型超参数等等

具体代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import torch
import os

base_dir = os.path.dirname(os.path.abspath(__file__))
# print(f'base_dir-->{base_dir}')


class ProjectConfig(object):
def __init__(self):
# 设置设备为CUDA:0(如果可用),否则设置为CPU
self.device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
# self.device = "mps:0"
# 设置预训练模型的路径
self.pre_model = os.path.join(base_dir, '../../bert-base-chinese')
# 设置训练数据集的路径
self.train_path = os.path.join(base_dir, 'data/train.txt')
# 设置验证数据集的路径
self.dev_path = os.path.join(base_dir, 'data/dev.txt')
# 设置verbalizer文件的路径,用于将标签映射到文本
self.verbalizer = os.path.join(base_dir, 'data/verbalizer.txt')
# 设置最大序列长度
self.max_seq_len = 512
# 设置批量大小
self.batch_size = 8
# 设置学习率
self.learning_rate = 5e-5
# 权重衰减系数
self.weight_decay = 0
# 学习率预热的系数
self.warmup_ratio = 0.06
# 伪token的个数
self.p_embedding_num = 6
# 最大标签长度
self.max_label_len = 2
# 设置训练的轮数
self.epochs = 50
# 设置日志记录的步数
self.logging_steps = 10
# 设置验证的步数
self.valid_steps = 20
# 设置保存模型的目录
self.save_dir = os.path.join(base_dir, 'save_model')


if __name__ == '__main__':
pc = ProjectConfig()
print(pc.verbalizer)

3、编写数据处理相关代码

  • 代码路径:llm_tuning/prompt_tasks/P-Tuning/data_handle/

  • data_handle文件夹中一共包含两个py脚本:data_preprocess.py、data_loader.py


3.1 data_preprocess.py

  • 目的: 将模板与原始输入文本进行拼接,构造模型接受的输入数据
  • 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import torch
import numpy as np
from datasets import load_dataset
from transformers import AutoTokenizer

from prompt_tasks.P_Tuning.ptune_config import ProjectConfig


def convert_example(
examples: dict,
tokenizer,
max_seq_len: int,
max_label_len: int,
p_embedding_num=6,
train_mode=True,
return_tensor=False
) -> dict:
'''
将样本数据转换为模型接收的输入数据。
:param examples: (dict): 训练数据样本
e.g. -> {
"text": [
'娱乐 嗨放派怎么停播了',
'体育 世界杯为何迟迟不见宣传',
...
]
}
:param tokenizer: 分词器
:param max_seq_len: 最大句子长度
:param max_label_len: (int): 最大label长度,若没有达到最大长度,则padding为最大长度
:param p_embedding_num: (int): p-tuning token 的个数
:param train_mode: 训练阶段 or 推理阶段
:param return_tensor: 是否返回tensor类型,如不是,则返回numpy类型。
:return:
dict (str: np.array) -> tokenized_output = {
'input_ids': [[101, 3928, ...], [101, 4395, ...]],
'token_type_ids': [[0, 0, ...], [0, 0, ...]],
'mask_positions': [[5, 6, ...], [3, 4, ...]],
'mask_labels': [[183, 234], [298, 322], ...]
}
'''
tokenized_output = {
'input_ids': [],
'attention_mask': [],
'mask_positions': [], # 记录label的位置(即MASK Token的位置)
'mask_labels': [] # 记录MASK Token的原始值(即Label值)
}

# print(f"examples['text']-->{examples['text']}")
# 遍历文本数据集,其中每个文本数据被赋予一个索引和值
for i, example in enumerate(examples['text']):
try:
# 将 prompt token(s) 插在 [CLS] 之后
start_mask_position = 1
if train_mode:
# print(f"example-->{example}")
# strip() 方法用于移除字符串头尾指定的字符(默认为空格),这里用于去除可能存在的多余空格
# split('\t', 1) 方法用于按照制表符('\t')将字符串分割成两部分,限制分割次数为1,确保只分割第一个制表符
label, content = example.strip().split('\t', 1)
# print(f'label-->{label}')
# print(f'content-->{content}')

# 使用tokenizer对标签进行编码,label token 转 id
mask_labels = tokenizer(text=label)
# print(f'mask_labels-->{mask_labels}')
# 从字典中获取input_ids,并丢掉[CLS]和[SEP]
mask_labels = mask_labels['input_ids'][1:-1]
# 将 label 长度限制为最长
mask_labels = mask_labels[:max_label_len]
# 将 label 补到最长
mask_labels += [tokenizer.pad_token_id] * (max_label_len - len(mask_labels)) # 将 label 补到最长
# print(f'mask_labels-->{mask_labels}')
# 将编码后的标签添加到tokenized_output字典中的'mask_labels'列表中
tokenized_output['mask_labels'].append(mask_labels)
else:
# 如果不是训练模式,直接将文本内容进行修剪并使用
content = example.strip()
encoded_inputs = tokenizer(text=content,truncation=True,max_length=max_seq_len,padding='max_length')
except:
continue

# 获取编码后的输入
input_ids = encoded_inputs['input_ids']
# print(f'input_ids-->{input_ids}')
# print(f'原始的input_id的长度-->{len(input_ids)}')

# 1.生成 MASK Tokens, 和label长度一致
mask_tokens = ['[MASK]'] * max_label_len
# print(f'mask_tokens-->{mask_tokens}')
# 将 MASK Tokens 转为 id
mask_ids = tokenizer.convert_tokens_to_ids(mask_tokens)
# print(f'mask_ids-->{mask_ids}')

# 2.构建 prompt token(s)
# 根据p_embedding_num生成对应的特殊token列表
p_tokens = ["[unused{}]".format(i + 1) for i in range(p_embedding_num)]
# print(f'p_tokens-->{p_tokens}')
# token 转 id
p_tokens_ids = tokenizer.convert_tokens_to_ids(p_tokens)
# print(f'p_tokens_ids-->{p_tokens_ids}')

# 根据 最大长度-p_token长度-label长度-1,裁剪content的长度 (裁剪[SEP]前的token, 所以-1)
tmp_input_ids = input_ids[:max_seq_len - len(mask_ids) - len(p_tokens_ids) - 1]
# print(f'tmp_input_ids1-->{tmp_input_ids}')
# print(f'tmp_input_ids1-->{len(tmp_input_ids)}')

# 3.插入 MASK -> [CLS][MASK][MASK]世界杯...[SEP]
tmp_input_ids = tmp_input_ids[:start_mask_position] + mask_ids + tmp_input_ids[start_mask_position:] + [input_ids[-1]]
# print(f'插入mask和sep之后的 tmp_input_ids--{tmp_input_ids}')
# print(f'插入mask和sep之后的 tmp_input_ids长度--{len(tmp_input_ids)}')

# 4.插入 prompt -> [unused1][unused2]...[CLS][MASK]...[SEP]
input_ids = p_tokens_ids + tmp_input_ids
# print(f'插入模版之后的 input_ids-->{input_ids}')
# print(f'插入模版之后的 input_ids长度-->{len(input_ids)}')

# 将新的输入添加到tokenized_output字典中
tokenized_output['input_ids'].append(input_ids)

# 将 Mask Tokens 的位置记录下来
mask_positions = [len(p_tokens_ids) + start_mask_position + i
for i in range(max_label_len)]
# print(f'mask_positions-->{mask_positions}')
# 将 Mask Tokens 的位置记录下来
tokenized_output['mask_positions'].append(mask_positions)

# 如果输入需要token_type_ids,可以进行添加
if 'token_type_ids' in encoded_inputs: # 兼容不需要 token_type_id 的模型, e.g. Roberta-Base
tmp = encoded_inputs['token_type_ids']
if 'token_type_ids' not in tokenized_output:
tokenized_output['token_type_ids'] = [tmp]
else:
tokenized_output['token_type_ids'].append(tmp)

# print(f'原始的attention_mask-->{encoded_inputs["attention_mask"]}')
# 修改attention_mask的0和1的位置,因为插入了prompt和MASK Tokens,影响了原来的句子padding的部分,所以需要重新生成
attention_mask = get_attention_mask(input_ids)
# print(f'修改的attention_mask-->{attention_mask}')
tokenized_output['attention_mask'].append(attention_mask)

# break

# 遍历tokenized_output字典,其中k是键,v是值
for k, v in tokenized_output.items():
# 如果return_tensor为True,将值转换为torch.LongTensor类型
if return_tensor:
tokenized_output[k] = torch.LongTensor(v)
# 否则,将值转换为numpy数组
else:
tokenized_output[k] = np.array(v)

return tokenized_output


def get_attention_mask(alist):
'''
生成注意力掩码。
对于输入的列表,将其中的每个元素与0进行比较,如果元素大于0,则在输出列表中对应位置设置为1,否则设置为0。
这个函数的目的是为了创建一个掩码,用于在注意力机制中指示哪些位置是有效的(即大于0),哪些位置是无效的(即等于0)。
:param alist: (list): 一个包含数字的列表,用于生成注意力掩码。
:return: list: 一个与输入列表长度相同的列表,其中原始列表中大于0的位置被设置为1,等于0的位置被设置为0。
'''
# 使用numpy的where函数来创建掩码:元素大于0则输出1,否则输出0
new_a = np.where(np.array(alist) > 0, 1, 0)
# 将生成的掩码数组转换回列表格式并返回
return new_a.tolist()


if __name__ == '__main__':
pc = ProjectConfig()
train_dataset = load_dataset('text', data_files={'train': pc.train_path})
print(type(train_dataset))
# print(train_dataset)
# print(train_dataset['train']['text'])
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)
tokenized_output = convert_example(examples=train_dataset['train'],
tokenizer=tokenizer,
max_seq_len=20,
max_label_len=2,
p_embedding_num=6,
train_mode=True,
return_tensor=False)
print(tokenized_output)
print(type(tokenized_output['mask_positions']))

打印结果展示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{'input_ids': array([[   1,    2,    3, ..., 1912, 6225,  102],
[ 1, 2, 3, ..., 3300, 5741, 102],
[ 1, 2, 3, ..., 6574, 7030, 0],
...,
[ 1, 2, 3, ..., 8024, 2571, 0],
[ 1, 2, 3, ..., 3221, 3175, 102],
[ 1, 2, 3, ..., 5277, 3688, 102]]),
'attention_mask': array([[1, 1, 1, ..., 1, 1, 1],
[1, 1, 1, ..., 1, 1, 1],
[1, 1, 1, ..., 1, 1, 0],
...,
[1, 1, 1, ..., 1, 1, 0],
[1, 1, 1, ..., 1, 1, 1],
[1, 1, 1, ..., 1, 1, 1]]),
'mask_positions': array([[7, 8],
[7, 8],
[7, 8],
...,
[7, 8],
[7, 8],
[7, 8]]),
'mask_labels': array([[4510, 5554],
[3717, 3362],
[2398, 3352],
...,
[3819, 3861],
[6983, 2421],
[3819, 3861]]),
'token_type_ids': array([[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
...,
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0]])}

3.2 data_loader.py

  • 目的:定义数据加载器

  • 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from functools import partial
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, default_data_collator

from prompt_tasks.P_Tuning.data_handle.data_preprocess import convert_example
from prompt_tasks.P_Tuning.ptune_config import ProjectConfig

# 实例化项目配置文件
pc = ProjectConfig()

# 使用项目配置文件中指定的预训练模型,初始化一个自动分词器
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)


def get_data():
'''
加载并处理数据集。
该函数从指定路径加载训练和开发数据集,并将它们转换为适合模型训练的格式。
使用Hugging Face的`load_dataset`函数加载数据,然后使用`partial`函数创建一个带有固定参数的新函数`new_func`,
用于转换数据集示例。转换后的数据集使用`DataLoader`包装,以便于批量处理和训练过程中使用。
:return: train_dataloader: 训练数据加载器。
dev_dataloader: 测试数据加载器。
'''
# 加载数据集,包括训练和测试集
dataset = load_dataset('text', data_files={'train': pc.train_path,
'dev': pc.dev_path})
# print(f'dataset-->{dataset}')

# 创建一个带有固定参数的函数,用于转换数据集示例
new_func = partial(convert_example,
tokenizer=tokenizer,
max_seq_len=pc.max_seq_len,
max_label_len=pc.max_label_len,
p_embedding_num=pc.p_embedding_num)

# 应用转换函数到数据集上
dataset = dataset.map(new_func, batched=True)

# 分离训练和测试数据集
train_dataset = dataset["train"]
# print(f'train_dataset-->{train_dataset}')
dev_dataset = dataset["dev"]
# print(f'dev_dataset-->{dev_dataset}')

# 创建训练数据加载器
train_dataloader = DataLoader(train_dataset,
shuffle=True,
collate_fn=default_data_collator,
batch_size=pc.batch_size)

# 创建测试数据加载器
dev_dataloader = DataLoader(dev_dataset,
collate_fn=default_data_collator,
batch_size=pc.batch_size)

# 返回训练和测试数据加载器
return train_dataloader, dev_dataloader


if __name__ == '__main__':
# 加载训练和测试数据
train_dataloader, dev_dataloader = get_data()

# 打印训练和测试数据加载器的长度
print(f'len(train_dataloader)-->{len(train_dataloader)}')
print(f'len(dev_dataloader)-->{len(dev_dataloader)}')

# 遍历训练数据加载器,查看数据
for i, value in enumerate(train_dataloader):
print(value)
# 打印输入ID的Tensor类型
print(value['input_ids'].dtype)
break

打印结果展示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{'input_ids': tensor([[1, 2, 3,  ..., 0, 0, 0],
[1, 2, 3, ..., 0, 0, 0],
[1, 2, 3, ..., 0, 0, 0],
...,
[1, 2, 3, ..., 0, 0, 0],
[1, 2, 3, ..., 0, 0, 0],
[1, 2, 3, ..., 0, 0, 0]]),
'attention_mask': tensor([[1, 1, 1, ..., 0, 0, 0],
[1, 1, 1, ..., 0, 0, 0],
[1, 1, 1, ..., 0, 0, 0],
...,
[1, 1, 1, ..., 0, 0, 0],
[1, 1, 1, ..., 0, 0, 0],
[1, 1, 1, ..., 0, 0, 0]]),
'mask_positions': tensor([[7, 8],
[7, 8],
[7, 8],
[7, 8],
[7, 8],
[7, 8],
[7, 8],
[7, 8]]),
'mask_labels': tensor([[6132, 3302],
[3717, 3362],
[6132, 3302],
[6983, 2421],
[6983, 2421],
[6132, 3302],
[3717, 3362],
[2398, 3352]]),
'token_type_ids': tensor([[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
...,
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0]])}
torch.int64

七、BERT+P-Tuning方式模型搭建与训练【实现】

本项目中完成BERT+P-Tuning模型搭建、训练及应用的步骤如下(注意:因为本项目中使用的是BERT预训练模型,所以直接加载即可,无需重复搭建模型架构):

  • 1.实现模型工具类函数
  • 2.实现模型训练函数,验证函数
  • 3.实现模型预测函数

1、实现模型工具类函数

  • 目的:模型在训练、验证、预测时需要的函数
  • 代码路径:llm_tuning/prompt_tasks/P_Tuning/utils
  • utils文件夹共包含3个py脚本:verbalizer.py、metirc_utils.py以及common_utils.py

1.1 verbalizer.py

  • 目的:定义一个Verbalizer类,用于将一个Label对应到其子Label的映射。
  • 具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# Union 是 typing 模块中定义的一个类,用于表示多个类型中的任意一种类型
from typing import Union, List
from transformers import AutoTokenizer

from prompt_tasks.P_Tuning.ptune_config import ProjectConfig

pc = ProjectConfig()


# Verbalizer类,用于将一个Label对应到其子Label的映射。
class Verbalizer(object):
def __init__(self,
verbalizer_file: str,
tokenizer, max_label_len: int
):
'''
:param verbalizer_file: verbalizer文件存放地址。
:param tokenizer: 用于文本和id之间的转换。
:param max_label_len: 标签长度,若大于则截断,若小于则补齐
'''
self.tokenizer = tokenizer
self.label_dict = self.load_label_dict(verbalizer_file)
self.max_label_len = max_label_len

def load_label_dict(self, verbalizer_file: str):
'''
读取本地文件,构建verbalizer字典。
:param verbalizer_file: verbalizer文件存放地址。
:return:
dict -> {
'体育': ['篮球', '足球','网球', '排球', ...],
'酒店': ['宾馆', '旅馆', '旅店', '酒店', ...],
...
}
'''
# 初始化一个空字典,用于存储标签和子标签的关系
label_dict = {}

# 打开verbalizer文件,以只读模式,使用utf8编码
with open(verbalizer_file, 'r', encoding='utf-8') as f:
# 读取文件的每一行
for line in f:
# 移除行尾的换行符,并按制表符('\t')分割标签和子标签
label, sub_labels = line.strip().split('\t')
# 将子标签按逗号(,)分割成列表,使用set去重后再转回列表,存储到label_dict中
label_dict[label] = list(set(sub_labels.split(',')))
# 返回处理后的标签和子标签的字典
return label_dict

def find_sub_labels(self, label: Union[list, str]):
'''
通过主标签找到所有的子标签。
:param label: 标签, 文本型 或 id_list, e.g. -> '体育' or [860, 5509]
:return:
dict -> {
'sub_labels': ['足球', '网球'],
'token_ids': [[6639, 4413], [5381, 4413]]
}
'''
# 如果传入的label为id列表,则通过tokenizer转换回字符串
if type(label) == list:
# 移除label中的pad_token_id,直到label中不再包含它
while self.tokenizer.pad_token_id in label:
label.remove(self.tokenizer.pad_token_id)
# 将处理后的id列表转换为tokens,并拼接成字符串
label = ''.join(self.tokenizer.convert_ids_to_tokens(label))
# print(f'label-->{label}')
# 检查转换后的label是否在标签字典中,如果不在则抛出异常
if label not in self.label_dict:
raise ValueError(f'Lable Error: "{label}" 不在 label_dict {list(self.label_dict)}.')

# 从标签字典中获取与label对应的子标签
sub_labels = self.label_dict[label]
# print(f'sub_labels-->{sub_labels}')
# 将子标签作为结果的一个部分存储在字典中
ret = {'sub_labels': sub_labels}

# 对每个子标签进行token化,不含特殊符号
token_ids = [token_id for token_id in self.tokenizer(sub_labels, add_special_tokens=False)['input_ids']]
# print(f'token_ids-->{token_ids}')
# 遍历所有的token_ids,进行截断与补齐操作
for i in range(len(token_ids)):
# 对标签进行截断
token_ids[i] = token_ids[i][:self.max_label_len]
# 如果长度不足max_label_len,则使用pad_token_id进行补齐
if len(token_ids[i]) < self.max_label_len:
token_ids[i] = token_ids[i] + [self.tokenizer.pad_token_id] * (self.max_label_len - len(token_ids[i]))
# 将处理后的token_ids存入ret字典中
ret['token_ids'] = token_ids
return ret

def batch_find_sub_labels(self, label: List[Union[list, str]]):
'''
批量找到子标签。
:param label: 标签列表, [[4510, 5554], [860, 5509]] or ['体育', '电脑']
:return:
list -> [
{
'sub_labels': ['笔记本', '电脑'],
'token_ids': [[5011, 6381, 3315], [4510, 5554]]
},
...
]
'''
return [self.find_sub_labels(l) for l in label]

def get_common_sub_str(self,
str1: str,
str2: str
):
'''
寻找最大公共子串(连续子序列)。
:param str1: abcd
:param str2: abadbcdba
:return:
'''
# 初始化两个字符串的长度
lstr1, lstr2 = len(str1), len(str2)
# 生成0矩阵,为方便后续计算,比字符串长度多了一列,生成一个 lstr1+1 * lstr2+1 的二维矩阵
record = [[0 for i in range(lstr2 + 1)] for j in range(lstr1 + 1)]
# 初始化最长匹配对应在str1中的最后一位
p = 0
# 初始化最长匹配长度
maxNum = 0
# 遍历两个字符串,寻找最长公共子串
for i in range(1, lstr1 + 1):
for j in range(1, lstr2 + 1):
# 当发现相同字符时
if str1[i - 1] == str2[j - 1]:
# 在record矩阵中记录匹配长度
record[i][j] = record[i - 1][j - 1] + 1
# 更新最长匹配长度和对应在str1中的最后一位
if record[i][j] > maxNum:
maxNum = record[i][j]
p = i

# 返回最长公共子串和其长度
return str1[p - maxNum:p], maxNum

def hard_mapping(self, sub_label: str):
'''
强匹配函数,当模型生成的子label不存在时,通过最大公共子串找到重合度最高的主label。
:param sub_label: 子label
:return: 主label
'''
# 初始化变量label和max_overlap_str,用于记录最大重叠度的标签和对应的重叠度值
label, max_overlap_str = '', 0

# 遍历标签字典,其中main_label是主标签,sub_labels是与主标签相关的子标签列表
for main_label, sub_labels in self.label_dict.items():
overlap_num = 0
# 对于每个子标签,计算它与当前推理标签之间的最长公共子串长度总和
for s_label in sub_labels:
# 累加每个子标签与当前推理标签之间的最长公共子串长度
overlap_num += self.get_common_sub_str(sub_label, s_label)[1]

# 如果当前的重叠度大于或等于之前的最大重叠度,则更新最大重叠度和对应的标签
if overlap_num >= max_overlap_str:
max_overlap_str = overlap_num
label = main_label

return label

def find_main_label(self,
sub_label: Union[list, str],
hard_mapping=True
):
'''
通过子标签找到父标签。
:param sub_label: 子标签, 文本型 或 id_list, e.g. -> '苹果' or [5741, 3362]
:param hard_mapping: 当生成的词语不存在时,是否一定要匹配到一个最相似的label。
:return:
dict -> {
'label': '水果',
'token_ids': [3717, 3362]
}
'''
# 如果传入的sub_label为id列表,则通过tokenizer转换回字符串
if type(sub_label) == list:
pad_token_id = self.tokenizer.pad_token_id
# 移除列表中的[PAD]token,避免影响后续处理
while pad_token_id in sub_label:
sub_label.remove(pad_token_id)
# 将id列表转换为对应的字符串
sub_label = ''.join(self.tokenizer.convert_ids_to_tokens(sub_label))
# print(f'sub_label-->{sub_label}')
# 初始化主标签为'无',作为未找到特定子标签时的默认值
main_label = '无'

# 遍历标签字典,寻找与子标签匹配的主标签
for label, sub_labels in self.label_dict.items():
# 检查当前子标签是否在字典中对应的子标签列表中
if sub_label in sub_labels:
# 当找到匹配时,更新主标签并终止循环
main_label = label
break
# print(f'main_label-->{main_label}')
# 如果主标签为'无'且启用了强匹配功能,则使用强匹配方法更新主标签
if main_label == '无' and hard_mapping:
main_label = self.hard_mapping(sub_label)
# print('强匹配', main_label)
ret = {
'label': main_label,
'token_ids': self.tokenizer(main_label, add_special_tokens=False)['input_ids']
}
return ret

def batch_find_main_label(self,
sub_label: List[Union[list, str]],
hard_mapping=True
):
'''
批量通过子标签找父标签。
:param sub_label: 子标签列表, ['苹果', ...] or [[5741, 3362], ...]
:param hard_mapping:
:return:
list: [
{
'label': '水果',
'token_ids': [3717, 3362]
},
...
]
'''
return [self.find_main_label(l, hard_mapping) for l in sub_label]


if __name__ == '__main__':
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)
verbalizer = Verbalizer(
verbalizer_file=pc.verbalizer,
tokenizer=tokenizer,
max_label_len=2)
print(f'label_dict-->{verbalizer.label_dict}')

# 查找单个子标签
label = '电脑'
ret = verbalizer.find_sub_labels(label)
print(f'ret-->{ret}')
print('*' * 80)

# 查找多个子标签
labels = ['电脑', '衣服']
# labels = [[4510, 5554], [6132, 3302]]
result = verbalizer.batch_find_sub_labels(labels)
print(f'result-->{result}')
print('*' * 80)

# 查找单个子标签对应的父标签
# sub_label = [4510, 5554]
sub_label = '衣电'
ret = verbalizer.find_main_label(sub_label)
print(f'ret-->{ret}')
print('*' * 80)

# 查找多个子标签对应的父标签
# sub_label = ['衣服', '牛奶']
sub_label = [[6132, 3302], [5885, 4281]]
ret = verbalizer.batch_find_main_label(sub_label, hard_mapping=True)
print(f'ret-->{ret}')

print结果显示:

1
2
3
4
5
6
7
8
label_dict-->{'电脑': ['电脑'], '水果': ['水果'], '平板': ['平板'], '衣服': ['衣服'], '酒店': ['酒店'], '洗浴': ['洗浴'], '书籍': ['书籍'], '蒙牛': ['蒙牛'], '手机': ['手机'], '电器': ['电器']}
ret-->{'sub_labels': ['电脑'], 'token_ids': [[4510, 5554]]}
********************************************************************************
result-->[{'sub_labels': ['电脑'], 'token_ids': [[4510, 5554]]}, {'sub_labels': ['衣服'], 'token_ids': [[6132, 3302]]}]
********************************************************************************
ret-->{'label': '电器', 'token_ids': [4510, 1690]}
********************************************************************************
ret-->[{'label': '衣服', 'token_ids': [6132, 3302]}, {'label': '蒙牛', 'token_ids': [5885, 4281]}]

1.2 common_utils.py

  • 目的:定义损失函数、将mask_position位置的token logits转换为token的id。
  • 脚本里面包含两个函数:mlm_loss()以及convert_logits_to_ids()
  • 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import torch

def mlm_loss(logits,
mask_positions,
sub_mask_labels,
cross_entropy_criterion,
device):
'''
计算指定位置的mask token的output与label之间的cross entropy loss。
:param logits: (torch.tensor): 模型原始输出 -> (batch_size, seq_len, vocab_size)
:param mask_positions: (torch.tensor): mask token的位置 -> (batch_size, mask_label_num)
:param sub_mask_labels: (list): mask token的sub label, 由于每个label的sub_label数目不同,所以这里是个变长的list,
e.g. -> [
[[2398, 3352]],
[[2398, 3352], [3819, 3861]]
]
:param cross_entropy_criterion: (CrossEntropyLoss): CE Loss计算器
:param device: (str): cpu还是gpu
:return: CE Loss
'''
'''
获取logits的尺寸信息,为后续计算做准备
logits.size()返回一个包含三个维度的元组
第一个维度(batch_size)代表批次大小,即一次处理的数据批次包含的样本数量
第二个维度(seq_len)代表序列长度,即每个样本中包含的序列元素数量
第三个维度(vocab_size)代表词汇表大小,即每个序列元素可能的类别数量
'''
batch_size, seq_len, vocab_size = logits.size()
# print(f'模型预测结果logits-->{logits.size()}')
# print(f'mask_positions-->{mask_positions.shape}')
# print(f'sub_mask_labels-->{sub_mask_labels}')

# 初始化loss变量为None,用于后续可能的损失计算
loss = None
# 遍历 logits、sub_mask_labels 和 mask_positions 的元素
for single_value in zip(logits, sub_mask_labels, mask_positions):
# 获取当前token的 logits
single_logits = single_value[0]
# print(f'single_logits-->{single_logits.shape}') # 形状[512, 21128]
# 获取当前token的 sub_mask_labels
single_sub_mask_labels = single_value[1]
# print(f'single_sub_mask_labels-->{single_sub_mask_labels}')
# 获取当前token的 mask_positions
single_mask_positions = single_value[2]
# print(f'single_mask_positions-->{single_mask_positions}') # 形状size[2]-->具体值([5, 6])

# 从单个序列的logits中,提取出被掩码位置的logits
single_mask_logits = single_logits[single_mask_positions] # (mask_label_num, vocab_size)
# 打印被掩码位置logits的形状,以验证其是否符合预期
# print(f'single_mask_logits-->{single_mask_logits.shape}')

# 模型训练时主标签对应的所有子标签都有相似的特征值, 在计算CE Loss时,需要将每个子标签的对应的损失求平均,因此需要将预测的概率值进行扩展
# 对单个 single_mask_logits 进行扩展,使其在第一个维度上重复,以匹配 single_sub_mask_labels 的数量
# 使用repeat设置重复的倍数 (sub_label_num, mask_label_num, vocab_size)
single_mask_logits = single_mask_logits.repeat(len(single_sub_mask_labels), 1, 1)
# 打印重复后的single_mask_logits的形状,以便调试和验证重复操作的效果
# print(f'重复后的single_mask_logits-->{single_mask_logits.shape}')

# 将三维张量调整为二维,以便计算损失
single_mask_logits = single_mask_logits.reshape(-1, vocab_size) # (sub_label_num * mask_label_num, vocab_size)
# print(f'调整成二维后的single_mask_logits-->{single_mask_logits.shape}')

# 将子标签转换为张量,并调整形状以匹配模型预测的结果
single_sub_mask_labels = torch.LongTensor(single_sub_mask_labels).to(device) # (sub_label_num, mask_label_num)
# 计算损失值时真实子标签维度为1维,因此需要将其展平以匹配模型预测的结果
single_sub_mask_labels = single_sub_mask_labels.reshape(-1, 1).squeeze() # (sub_label_num * mask_label_num)
# print(f'真实子标签mask值:single_sub_mask_labels-->{single_sub_mask_labels.shape}')
# print(f'真实子标签mask值:single_sub_mask_labels-->{single_sub_mask_labels}')

# 计算当前批次所有子标签的损失
cur_loss = cross_entropy_criterion(single_mask_logits, single_sub_mask_labels)
# 计算当前批次所有子标签的平均损失
cur_loss = cur_loss / len(single_sub_mask_labels)

# 如果当前损失loss未被初始化(即为None),则将其设置为当前批次的损失cur_loss
if not loss:
loss = cur_loss
# 如果当前损失loss已经存在,则将当前批次的损失cur_loss累加到loss中
else:
loss += cur_loss

# 计算平均损失:将累计的损失loss除以批次大小batch_size
loss = loss / batch_size # (1,)
return loss

def convert_logits_to_ids(
logits: torch.tensor,
mask_positions: torch.tensor):
'''
输入Language Model的词表概率分布(LMModel的logits),将mask_position位置的token logits转换为token的id。
:param logits: (torch.tensor): model output -> (batch, seq_len, vocab_size) [8, 512, 21128]
:param mask_positions: (torch.tensor): mask token的位置 -> (batch, mask_label_num) [8, 2]
:return: 对应mask position上最大概率的推理token -> (batch, mask_label_num) [8, 2]
'''
# 获取标签的长度,mask_positions.size()返回的是一个包含维度的元组,[1]表示获取第二个维度的大小
label_length = mask_positions.size()[1]
# print(f'label_length-->{label_length}')

# 获取批次大小、序列长度和词汇表大小,logits.size()返回的是一个包含维度的元组
batch_size, seq_len, vocab_size = logits.size()

# 初始化一个空列表,用于存储重塑后的 mask_positions
mask_positions_after_reshaped = []

# print(f'mask_positions.detach().cpu().numpy().tolist()-->{mask_positions.detach().cpu().numpy().tolist()}')
# 遍历每个批次的mask_positions
for batch, mask_pos in enumerate(mask_positions.detach().cpu().numpy().tolist()):
# 遍历每个mask位置
for pos in mask_pos:
# 将批次号和序列中的mask位置结合起来,得到重塑后的mask_positions
mask_positions_after_reshaped.append(batch * seq_len + pos)
# print(f'mask_positions_after_reshaped-->{mask_positions_after_reshaped}')
# print(f'原始的logits-->{logits.shape}')

# 将原始的logits重塑为(batch_size * seq_len, vocab_size)的形状
logits = logits.reshape(batch_size * seq_len, -1) # (batch_size * seq_len, vocab_size)
# print(f'改变原始模型输出的结果形状-->{logits.shape}')

# 从重塑后的logits中,选择出被掩码位置的logits
mask_logits = logits[mask_positions_after_reshaped]
# print(f'被掩码位置的logits-->{mask_logits.shape}')

# 获取每个样本mask位置所预测的tokens
predict_tokens = mask_logits.argmax(dim=-1) # (batch * label_num)
# print(f'获取每个样本mask位置预测的tokens', predict_tokens)

# 将每个样本mask位置预测的tokens重塑为(batch, label_num)的形状
predict_tokens = predict_tokens.reshape(-1, label_length) # (batch, label_num)
# print(f'predict_tokens-->{predict_tokens}')

return predict_tokens

1.3 metirc_utils.py

  • 目的:定义(多)分类问题下的指标评估(acc, precision, recall, f1)。
  • 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from typing import List
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, f1_score
from sklearn.metrics import recall_score, confusion_matrix


class ClassEvaluator(object):
def __init__(self):
# 初始化真实结果和预测结果的列表
self.goldens = [] # 存储真实结果数据
self.predictions = [] # 存储预测结果数据

def add_batch(self,
pred_batch: List[List],
gold_batch: List[List]):
'''
添加一个batch中的prediction和gold列表,用于后续统一计算。
:param pred_batch: (list): 模型预测标签列表, e.g. -> [['体', '育'], ['财', '经'], ...]
:param gold_batch: (list): 真实标签标签列表, e.g. -> [['体', '育'], ['财', '经'], ...]
:return:
'''
# 确保预测批次和真实批次长度一致,这是后续处理的前提条件
assert len(pred_batch) == len(gold_batch)
# print(f'pred_batch0-->{pred_batch}')
# print(f'gold_batch0-->{gold_batch}')

# 若遇到多个子标签构成一个标签的情况
# 判断gold_batch的第一个元素是否为列表或元组类型
if type(gold_batch[0]) in [list, tuple]:
# 如果是,则将pred_batch中的每个元素转换为字符串后拼接起来
pred_batch = [''.join([str(e) for e in ele]) for ele in pred_batch]
# 同样地,也将gold_batch中的每个元素转换为字符串后拼接起来
gold_batch = [''.join([str(e) for e in ele]) for ele in gold_batch]
# print(f'pred_batch-->{pred_batch}')
# print(f'gold_batch-->{gold_batch}')

# 将真实结果的批次数据添加到self.goldens列表中
self.goldens.extend(gold_batch)
# print(f'self.goldens-->{self.goldens}')
# 将预测结果的批次数据添加到self.predictions列表中
self.predictions.extend(pred_batch)
# print(f'self.predictions-->{self.predictions}')

def compute(self, round_num=2) -> dict:
'''
根据当前类中累积的变量值,计算当前的P, R, F1。
:param round_num: (int): 计算结果保留小数点后几位, 默认小数点后2位。
:return:
dict -> {
'accuracy': 准确率,
'precision': 精准率,
'recall': 召回率,
'f1': f1值,
'class_metrics': {
'0': {
'precision': 该类别下的precision,
'recall': 该类别下的recall,
'f1': 该类别下的f1
},
...
}
}
'''
# print(f'self.goldens-->{self.goldens}')
# print(f'self.predictions-->{self.predictions}')
# 初始化类别集合、类别指标字典和结果字典,用于存储全局指标
# 将 self.goldens 和 self.predictions 的集合合并,并进行排序,结果存储在变量 classes 中。
classes = sorted(list(set(self.goldens) | set(self.predictions)))
class_metrics = {}
res = {}
# print(f'classes-->{classes}')

# 构建全局指标
# 计算并存储全局准确率
res['accuracy'] = round(accuracy_score(self.goldens, self.predictions), round_num)
# 计算并存储全局精确率
res['precision'] = round(precision_score(self.goldens, self.predictions, average='weighted'), round_num)
# 计算并存储全局召回率
res['recall'] = round(recall_score(self.goldens, self.predictions, average='weighted'), round_num)
# 计算并存储全局F1分数
res['f1'] = round(f1_score(self.goldens, self.predictions, average='weighted'), round_num)
# print(f'res-->{res}')

try:
# 计算混淆矩阵,并将其转换为numpy数组,形状为(n_class, n_class)
conf_matrix = np.array(confusion_matrix(self.goldens, self.predictions))
# print(f'conf_matrix-->{conf_matrix}')
# 确保混淆矩阵的维度与类别数量匹配
assert conf_matrix.shape[0] == len(classes)
# 遍历每个类别,计算精确度(precision)、召回率(recall)和F1分数(f1)
for i in range(conf_matrix.shape[0]):
# 计算当前类别的精确度
precision = 0 if sum(conf_matrix[:, i]) == 0 else (conf_matrix[i, i] / sum(conf_matrix[:, i]))
# 计算当前类别的召回率
recall = 0 if sum(conf_matrix[i, :]) == 0 else (conf_matrix[i, i] / sum(conf_matrix[i, :]))
# 计算当前类别的F1分数
f1 = 0 if (precision + recall) == 0 else (2 * precision * recall / (precision + recall))
# 将当前类别的精确度、召回率和F1分数保存到字典中
class_metrics[classes[i]] = {
'precision': round(precision, round_num),
'recall': round(recall, round_num),
'f1': round(f1, round_num)
}
# 将所有类别的指标保存到结果字典中
res['class_metrics'] = class_metrics
except Exception as e:
# 异常处理:当计算类别指标时发生异常,打印警告信息和相关数据
print(f'[Warning] Something wrong when calculate class_metrics: {e}')
print(f'--> goldens: {set(self.goldens)}')
print(f'--> predictions: {set(self.predictions)}')
print(f'--> diff elements: {set(self.predictions) - set(self.goldens)}')
# 将结果字典中的类别指标设置为空字典
res['class_metrics'] = {}

return res

def reset(self):
"""
重置积累的数值。
"""
self.goldens = []
self.predictions = []


if __name__ == '__main__':
metric = ClassEvaluator()
metric.add_batch(
[['财', '经'], ['财', '经'], ['体', '育'], ['体', '育'], ['计', '算', '机']],
[['体', '育'], ['财', '经'], ['体', '育'], ['计', '算', '机'], ['计', '算', '机']],
)
# metric.add_batch(
# [0, 0, 1, 1, 0],
# [1, 1, 1, 0, 0]
# )
res = metric.compute()
print(res)

print代码结果:

1
2
3
4
5
6
7
8
9
{'accuracy': 0.6, 
'precision': 0.7,
'recall': 0.6,
'f1': 0.6,
'class_metrics':
{'体育': {'precision': np.float64(0.5), 'recall': np.float64(0.5), 'f1': np.float64(0.5)},
'计算机': {'precision': np.float64(1.0), 'recall': np.float64(0.5), 'f1': np.float64(0.67)},
'财经': {'precision': np.float64(0.5), 'recall': np.float64(1.0), 'f1': np.float64(0.67)}
}}

2、实现模型训练函数,验证函数

  • 目的:实现模型的训练和验证,脚本里面包含两个函数:model2train()和evaluate_model()

  • 代码路径:llm_tuning/prompt_tasks/P_Tuning/train.py

    代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import os
import time

import torch
from tqdm import tqdm
from transformers import AutoModelForMaskedLM, AutoTokenizer, get_scheduler

from prompt_tasks.PET.data_handle.data_loader import get_data
from prompt_tasks.PET.pet_config import ProjectConfig
from prompt_tasks.PET.utils.common_utils import mlm_loss, convert_logits_to_ids
from prompt_tasks.PET.utils.metirc_utils import ClassEvaluator
from prompt_tasks.PET.utils.verbalizer import Verbalizer

pc = ProjectConfig()


def model2train():
# 加载训练数据和验证数据
train_dataloader, dev_dataloader = get_data()

# 加载预训练模型
model = AutoModelForMaskedLM.from_pretrained(pc.pre_model).to(pc.device)
# print(f'预训练模型带MLM头的-->{model}')
# 加载分词器
tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)
# 加载映射词表
verbalizer = Verbalizer(verbalizer_file=pc.verbalizer,
tokenizer=tokenizer,
max_label_len=pc.max_label_len)
# print(f'verbalizer-->{verbalizer.label_dict}')

# 不需要权重衰减的参数
no_decay = ["bias", "LayerNorm.weight"]
# print(type(model.parameters()))
# 定义优化器的参数组,以便对模型的不同部分应用不同的权重衰减
optimizer_grouped_parameters = [
# 第一组参数:包含所有适用权重衰减的模型参数
{
"params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
"weight_decay": pc.weight_decay,
},
# 第二组参数:包含所有不适用权重衰减的模型参数
{
"params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
"weight_decay": 0.0,
},
]
# 初始化AdamW优化器,用于模型参数的优化
# AdamW是Adam算法的变体,加入了权重衰减(L2正则化),有助于防止过拟合
# 参数optimizer_grouped_parameters是分组的模型参数,允许对不同的参数应用不同的学习率或正则化强度
optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=pc.learning_rate)

# 根据训练轮数计算最大训练步数,以便于scheduler动态调整lr
num_update_steps_per_epoch = len(train_dataloader)
# 指定总的训练步数,它会被学习率调度器用来确定学习率的变化规律,确保学习率在整个训练过程中得以合理地调节
max_train_steps = pc.epochs * num_update_steps_per_epoch
# 计算预热阶段的训练步数,用于初始化学习率调度
warm_steps = int(pc.warmup_ratio * max_train_steps) # 预热阶段的训练步数
# 创建学习率调度器,使用线性调度策略,根据训练的进行逐步调整学习率
lr_scheduler = get_scheduler(
name='linear',
optimizer=optimizer,
num_warmup_steps=warm_steps,
num_training_steps=max_train_steps)

# 初始化损失列表,用于记录训练过程中的损失值
loss_list = []
# 记录训练开始的时间,用于计算训练时长
tic_train = time.time()
# 创建分类评估器,用于评估模型性能
metric = ClassEvaluator()
# 定义损失函数,用于计算模型预测值与真实标签之间的差异
criterion = torch.nn.CrossEntropyLoss()
# 初始化训练次数和最佳F1分数,用于跟踪训练进度和模型性能
global_step, best_f1 = 0, 0

print('开始训练:')
for epoch in range(pc.epochs):
for batch in tqdm(train_dataloader, desc='模型训练'):
# print(f'batch-->{batch}')
# 将批次数据输入模型,获取logits
logits = model(input_ids=batch['input_ids'].to(pc.device),
token_type_ids=batch['token_type_ids'].to(pc.device),
attention_mask=batch['attention_mask'].to(pc.device)).logits
# print(f'logits->{logits.shape}')

# 真实标签
mask_labels = batch['mask_labels'].numpy().tolist()
# print(f'mask_labels--->{mask_labels}')
# 提取子标签
sub_labels = verbalizer.batch_find_sub_labels(mask_labels)
# print(f'sub_labels--->{sub_labels}')
# 获取子标签的token_ids
sub_labels = [ele['token_ids'] for ele in sub_labels]
# print(f'sub_labels_token_ids--->{sub_labels}')

# 计算掩码语言模型的损失值
loss = mlm_loss(logits,
batch['mask_positions'].to(pc.device),
sub_labels,
criterion,
pc.device)
# print(f'计算损失值-->{loss}')
# 清零优化器的梯度
optimizer.zero_grad()
# 反向传播计算梯度
loss.backward()
# 更新模型参数
optimizer.step()
# 更新学习率调度器
lr_scheduler.step()

# 将损失值添加到损失列表中
loss_list.append(loss)
# 训练次数增加1
global_step += 1
# 打印训练日志
if global_step % pc.logging_steps == 0:
time_diff = time.time() - tic_train
loss_avg = sum(loss_list) / len(loss_list)
print("global step %d, epoch: %d, loss: %.5f, speed: %.2f step/s"
% (global_step, epoch, loss_avg, pc.logging_steps / time_diff))
tic_train = time.time()
# 模型验证
# 使用给定的模型、评估指标、数据加载器、分词器和标记化器进行模型评估
acc, precision, recall, f1, class_metrics = evaluate_model(model,
metric,
dev_dataloader,
tokenizer,
verbalizer)

# 打印评估结果中的精确度、召回率和F1分数
print("验证集的 precision: %.5f, recall: %.5f, F1: %.5f" % (precision, recall, f1))
# 如果当前F1分数高于最佳F1分数,则更新最佳F1分数和相关模型及分词器
if f1 > best_f1:
print(
f"最好的f1分数被更新: {best_f1:.5f} --> {f1:.5f}"
)
print(f'每种类型的Metrics为: {class_metrics}')
# 更新当前最佳的F1分数
best_f1 = f1
# 定义当前保存模型和分词器的目录
cur_save_dir = os.path.join(pc.save_dir, "model_best")
print(cur_save_dir)
# 检查并创建保存目录(如果不存在)
if not os.path.exists(cur_save_dir):
os.makedirs(cur_save_dir)
# 保存模型到指定目录
model.save_pretrained(cur_save_dir)
# 保存分词器到指定目录
tokenizer.save_pretrained(cur_save_dir)
tic_train = time.time()

print('训练结束')


def evaluate_model(model,
metric,
data_loader,
tokenizer,
verbalizer):
'''
在测试集上评估当前模型的训练效果。
:param model: 当前模型
:param metric: 评估指标类(metric)
:param data_loader: 测试集的dataloader
:param tokenizer: 分词器
:param verbalizer: 映射表
:return:
'''
model.eval()
metric.reset()

with torch.no_grad():
for step, batch in enumerate(tqdm(data_loader, desc='模型验证')):
# print(f'batch-->{batch}')
logits = model(input_ids=batch['input_ids'].to(pc.device),
token_type_ids=batch['token_type_ids'].to(pc.device),
attention_mask=batch['attention_mask'].to(pc.device)).logits
# print(f'验证集模型预测的结果————>{logits.shape}')

mask_labels = batch['mask_labels'].numpy().tolist() # (batch, label_num)
# print(f"mask_labels-0-->{mask_labels}")

for i in range(len(mask_labels)): # 去掉label中的[PAD] token
while tokenizer.pad_token_id in mask_labels[i]:
mask_labels[i].remove(tokenizer.pad_token_id)
# print(f'mask_labels-1-->{mask_labels}')
# 将mask_labels id转换为文字
mask_labels = [''.join(tokenizer.convert_ids_to_tokens(t)) for t in mask_labels]
# print(f'真实的结果主标签:mask_labels_str-->{mask_labels}')

# 获取模型预测的子标签
predictions = convert_logits_to_ids(logits,
batch['mask_positions']).cpu().numpy().tolist() # (batch, label_num)
# print(f'模型预测的子标签的结果-->{predictions}')

# 根据模型预测的子标签,找到子label属于的主label
predictions = verbalizer.batch_find_main_label(predictions) # 找到子label属于的主label
# print(f"找到模型预测的子标签对应的主标签的结果-->{predictions}')")

# 获得预测的主标签名
predictions = [ele['label'] for ele in predictions]
# print(f"只获得预测的主标签的结果string-->{predictions}')")

# 调用add_batch方法, 将模型预测的主标签与真实主标签保存到metric属性中
metric.add_batch(pred_batch=predictions, gold_batch=mask_labels)
eval_metric = metric.compute()
model.train()

return eval_metric['accuracy'], eval_metric['precision'], \
eval_metric['recall'], eval_metric['f1'], \
eval_metric['class_metrics']


if __name__ == '__main__':
model2train()
  • 输出结果:
1
2
3
4
5
6
7
8
...
global step 350, epoch: 43, loss: 0.10804, speed: 1.20 step/s
global step 360, epoch: 44, loss: 0.10504, speed: 1.22 step/s
global step 370, epoch: 46, loss: 0.10220, speed: 1.21 step/s
global step 380, epoch: 47, loss: 0.09951, speed: 1.20 step/s
global step 390, epoch: 48, loss: 0.09696, speed: 1.20 step/s
global step 400, epoch: 49, loss: 0.09454, speed: 1.22 step/s
Evaluation precision: 0.76000, recall: 0.70000, F1: 0.70000

  • 结论: BERT+P-Tuning模型在训练集上的表现是Precion: 76%
  • 注意:本项目中只用了60条样本,在接近400条样本上精确率就已经达到了76%,如果想让指标更高,可以扩增样本。

提升模型性能:

增加训练数据集(100条左右的数据):

1
2
3
4
5
手机	外观时尚新潮,适合年轻人展现个性。
手机 屏幕显示效果非常出色,观看视频和浏览网页很舒适。
电脑 使用了一段时间的这款电脑,硬盘采用WD,运行流畅无卡顿,温度控制较好,性价比令人满意。
手机 手机反应灵敏,操作界面简洁易用,非常满意。
电器 产品性能稳定,很不错哦!购买时有点担心,但收到货后发现是正品,大家可以放心购买。

修改验证集脏数据

1
2
3
4
# 原始标签和评论文本内容不符
平板 手机很好,就是客服垃圾特别是元豆
# 修改后
手机 手机很好,就是客服垃圾特别是元豆

模型表现:

Evaluation precision: 0.79000, recall: 0.70000, F1: 0.71000

3、实现模型预测函数

  • 目的:加载训练好的模型并测试效果
  • 代码路径:llm_tuning/prompt_tasks/P_Tuning/inference.py

​ 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import time
from typing import List
import torch
from transformers import AutoTokenizer, AutoModelForMaskedLM

from prompt_tasks.P_Tuning.data_handle.data_preprocess import convert_example
from prompt_tasks.P_Tuning.ptune_config import ProjectConfig
from prompt_tasks.P_Tuning.utils.common_utils import convert_logits_to_ids
from prompt_tasks.P_Tuning.utils.verbalizer import Verbalizer

pc = ProjectConfig()

model_path = os.path.join(pc.save_dir, 'model_best')
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForMaskedLM.from_pretrained(model_path).to(pc.device)
model.eval()

max_label_len = 2 # 标签最大长度
p_embedding_num = 6
verbalizer = Verbalizer(
verbalizer_file=pc.verbalizer,
tokenizer=tokenizer,
max_label_len=max_label_len)

def inference(contents: List[str]):
'''
推理函数,输入原始句子,输出mask label的预测值。
:param contents:
:return: 描原始句子列表。
'''
with (torch.no_grad()):
start_time = time.time()

# 将内容封装为示例字典,准备进行标记化处理
examples = {'text': contents}

# 对示例进行标记化处理,返回标记化输出
tokenized_output = convert_example(
examples,
tokenizer,
max_seq_len=128,
max_label_len=max_label_len,
p_embedding_num=p_embedding_num,
train_mode=False,
return_tensor=True)

# 使用模型进行预测,获取logits
logits = model(input_ids=tokenized_output['input_ids'].to(pc.device),
token_type_ids=tokenized_output['token_type_ids'].to(pc.device),
attention_mask=tokenized_output['attention_mask'].to(pc.device)).logits

# 将logits转换为预测标签
predictions = convert_logits_to_ids(logits, tokenized_output['mask_positions']
).cpu().numpy().tolist() # (batch, label_num)

# 找到子label属于的主label
predictions = verbalizer.batch_find_main_label(predictions)

# 提取预测的标签
predictions = [ele['label'] for ele in predictions]

used = time.time() - start_time
print(f'耗时 {used} 秒。')
return predictions


if __name__ == '__main__':
contents = [
'天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出行,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐',
'环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不错,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐',
"物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小",
"福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了,之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多来点其他小点,饼干一直也是大爱,那天好像也没看到",
"服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒适,晚上休息很安逸,隔音效果不错赞,下次还会来"
]
print("针对下面的文本评论,请分别给出对应所属类别:")
res = inference(contents)
print('推断的类别为:', res)
new_dict = {}
for i in range(len(contents)):
new_dict[contents[i]] = res[i]
print(f'new_dict-->{new_dict}')
  • 结果展示
1
2
3
4
5
6
7
8
9
10
11
12
13
{
'天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出
行,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐': '酒店',
'环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不
错,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐': '酒店',
'物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小': '衣服',
'福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了
,之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做
柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多来
点其他小点,饼干一直也是大爱,那天好像也没看到': '平板',
'服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒
适,晚上休息很安逸,隔音效果不错赞,下次还会来': '酒店'
}