transformers库使用

1 transformers库使用

1.1 transformers库是什么

  • 收集预训练模型的开源库
  • 各种开源大模型以及数据集
  • 访问https://huggingface.co需要科学上网

1.2 transformers库使用

1
2
3
4
5
6
7
8
9
10
# 创建虚拟环境
conda create --name 虚拟环境名称 python=3.10
# 切换虚拟环境
conda activate 虚拟环境名称
# 安装transformers库
pip install transformers -i https://mirrors.aliyun.com/pypi/simple/
# 安装datasets库
pip install datasets -i https://mirrors.aliyun.com/pypi/simple/
# 安装torch cpu/gpu 当前是cpu版本
pip install torch -i https://mirrors.aliyun.com/pypi/simple/
  • 管道方式

    • 文本分类任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      import torch
      from transformers import pipeline
      import numpy as np


      # 文本分类任务
      def dm01():
      # 加载预训练模型
      # 加载本地模型
      # 加载在线模型 techthiyanes/xxx
      # task: 任务名固定的, 不是自定义
      model = pipeline(task='sentiment-analysis', model='../model/chinese_sentiment')
      # 模型推理
      result = model('我爱北京天安门,天安门上太阳升。')
      print('result--->', result)


      if __name__ == '__main__':
      dm01()
    • 特征提取任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      # 特征提取, 不带任务头, 等同于词嵌入模型
      def dm02():
      model = pipeline(task='feature-extraction', model='../model/bert-base-chinese')
      # 模型推理
      output = model('人生该如何起头')
      # [cls]xxxx[sep]xxx[sep]
      # cls->整个序列语义表示
      print('output--->', type(output), np.array(output).shape, output)


      if __name__ == '__main__':
      dm02()
    • 完型填空任务

      1
      2
      3
      4
      5
      6
      7
      # 完型填空任务 MLM BERT预训练模型的子任务
      def dm03():
      model = pipeline(task='fill-mask', model='../model/bert-base-chinese')
      # 模型推理
      # [MASK]: 掩码token表示
      output = model('我想明天去[MASK]家吃饭。')
      print('output--->', output)
    • 阅读理解任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      # 问答 阅读理解
      def dm04():
      model = pipeline(task='question-answering', model='../model/chinese_pretrain_mrc_roberta_wwm_ext_large')
      # 准备数据
      context = '我叫张三,我是一个程序员,我的喜好是打篮球。'
      questions = ['我是谁?', '我是做什么的?', '我的爱好是什么?']
      # 模型推理
      output = model(context=context, question=questions)
      print('output--->', output)
    • 文本摘要任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      # 文本摘要
      def dm05():
      model = pipeline(task='summarization', model='../model/distilbart-cnn-12-6')
      context = "BERT is a transformers model pretrained on a large corpus of English data " \
      "in a self-supervised fashion. This means it was pretrained on the raw texts " \
      "only, with no humans labelling them in any way (which is why it can use lots " \
      "of publicly available data) with an automatic process to generate inputs and " \
      "labels from those texts. More precisely, it was pretrained with two objectives:Masked " \
      "language modeling (MLM): taking a sentence, the model randomly masks 15% of the " \
      "words in the input then run the entire masked sentence through the model and has " \
      "to predict the masked words. This is different from traditional recurrent neural " \
      "networks (RNNs) that usually see the words one after the other, or from autoregressive " \
      "models like GPT which internally mask the future tokens. It allows the model to learn " \
      "a bidirectional representation of the sentence.Next sentence prediction (NSP): the models" \
      " concatenates two masked sentences as inputs during pretraining. Sometimes they correspond to " \
      "sentences that were next to each other in the original text, sometimes not. The model then " \
      "has to predict if the two sentences were following each other or not."
      # 模型推理
      output = model(context)
      print('output--->', output)
    • NER任务

      1
      2
      3
      4
      5
      def dm06():
      model = pipeline(task='ner', model='../model/roberta-base-finetuned-cluener2020-chinese')
      # 模型推理
      output = model.predict('我爱北京天安门,天安门上太阳升。')
      print('output--->', output)
  • 自动模型方式

    • 文本分类任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      import torch
      from transformers import AutoTokenizer # 分词器
      from transformers import AutoModelForSequenceClassification # 文本分类/句子分类


      def dm01():
      # todo:1- 创建分词器对象
      tokenizer = AutoTokenizer.from_pretrained('../model/chinese_sentiment')
      # print('tokenizer--->', tokenizer)
      # todo:2- 创建模型对象
      model = AutoModelForSequenceClassification.from_pretrained('../model/chinese_sentiment')
      # print('model--->', model)
      # print('model--->', model.bert.embeddings)
      # todo:3- 处理输入数据, 转换成模型输入格式
      input = '人生该如何起头'
      # return_tensors: pt->pytorch 返回张量数据对象
      # [CLS]xxx[SEP]
      input = tokenizer.encode(text=input, padding=True, truncation=True, return_tensors='pt')
      print('input--->', input.shape, input)
      # 使用分词器对象对输入进行编码, 张量数据
      # todo:4- 调用模型对象进行推理, 切换模型为推理模式
      model.eval()
      # output类型不是张量类型, 类的对象
      output = model(input)
      print('output--->', output)
      print('output.logits--->', output.logits)
      # output2:返回元组类型 (张量对象,)
      # output2[0]:元组下标取值
      output2 = model(input, return_dict=False)
      print('output2--->', output2, output2[0])
      # todo:5- 对输出结果进行转换, 得到类别
      output_label = torch.argmax(output.logits, dim=-1).item()
      print('output_label--->', output_label)
      # model.config.id2label->字典类型 {id:lable, ...}
      print(model.config.id2label[output_label])
    • 特征提取任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      # 特征提取, 不带任务头, 没有输出层
      from transformers import AutoModel
      def dm02():
      tokenizer = AutoTokenizer.from_pretrained('../model/bert-base-chinese')
      model = AutoModel.from_pretrained('../model/bert-base-chinese')
      # 查看模型的隐藏层维度
      print(model.config.hidden_size)
      # print('model--->', model)
      # todo: 处理输入数据
      text = ['你是谁', '人生该如何起头']
      input = tokenizer.encode_plus(text=text,
      return_tensors='pt',
      truncation=True,
      padding='max_length', # 参数值是下边的参数名
      max_length=30)
      # 返回字典类型
      # input_ids: 编码后的token id
      # token_type_ids: bert->nsp任务 下一句预测 0/1 0:第一句话 1:第二句话
      # attention_mask: 注意力掩码 1:有效的token 0:无效的token
      print('input--->', type(input), input)
      # todo:模型推理
      model.eval()
      # 通过key获取字典的value
      input_ids = input['input_ids']
      token_type_ids = input['token_type_ids']
      attention_mask = input['attention_mask']
      output = model(input_ids, token_type_ids, attention_mask)
      # **input: 函数的不定长参数章节介绍 对字典进行拆包 key1=1value, key2=value2
      # *args: 对列表和元组进行拆包 *(1,2,3) -> 1, 2, 3
      # output = model(**input) # model(input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)
      # 返回类的对象
      print('output--->', output)
      # 对象名.属性名 获取属性值
      print(output.last_hidden_state.shape, output.last_hidden_state)
      # pooler: [CLS]中存储文本语义, 对[CLS]进行池化操作, 线性计算
      print(output.pooler_output.shape, output.pooler_output)
    • 完型填空任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      # 完型填空任务
      from transformers import AutoModelForMaskedLM
      def dm03():
      tokenizer = AutoTokenizer.from_pretrained('../model/chinese-bert-wwm')
      model = AutoModelForMaskedLM.from_pretrained('../model/chinese-bert-wwm')

      # 输入数据处理
      input = tokenizer.encode_plus(text='我想明天去[MASK]家吃饭.', return_tensors='pt')
      print('input--->', input)

      # 模型推理
      model.eval()
      # 字典拆包
      output = model(**input)
      print('output--->', output)
      # 对象名.属性名
      logits = output.logits
      print('logits--->', logits.shape, logits)

      # 获取[MASK]位置的预测结果
      # logits[0][6]: mask位置的预测结果 21128个概率值
      print('logits[0][6]--->', logits[0][6])
      mask_idx = torch.argmax(logits[0][6], dim=-1).item()
      print('mask_idx--->', mask_idx)
      # 调用分词器对象的方法, 实现id转换成token
      print(tokenizer.convert_ids_to_tokens([mask_idx]))
    • 阅读理解任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      # 问答任务  阅读理解
      from transformers import AutoModelForQuestionAnswering
      def dm04():
      tokenizer = AutoTokenizer.from_pretrained('../model/chinese_pretrain_mrc_roberta_wwm_ext_large')
      model = AutoModelForQuestionAnswering.from_pretrained('../model/chinese_pretrain_mrc_roberta_wwm_ext_large')

      # 准备问答数据
      context = '我叫张三 我是一个程序员 我的喜好是打篮球'
      questions = ['我是谁?', '我是做什么的?', '我的爱好是什么?']

      # 循环遍历模型推理
      model.eval()
      for question in questions:
      # 处理输入数据
      input = tokenizer.encode_plus(question, context, return_tensors='pt')
      print('input--->', input)
      # 模型推理
      output = model(**input)
      # 返回预测开始token的概率, 预测结束token的概率
      print('output--->', output)
      # +1:列表切片左闭右开原则
      # 返回的下标索引是input_ids张量的下标, 不是里面的值!!!
      start_idx, end_idx = torch.argmax(output.start_logits, dim=-1), torch.argmax(output.end_logits, dim=-1) + 1
      print('start_idx--->', start_idx, 'end_idx--->', end_idx)
      # 获取预测token
      # token词表中的下标索引
      tmp_idx_list = input['input_ids'][0][start_idx.item():end_idx.item()]
      print('tmp_idx_list--->', tmp_idx_list)
      # 获取预测token
      print(tokenizer.convert_ids_to_tokens(tmp_idx_list))
    • 文本摘要任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      # 文本摘要任务
      from transformers import AutoModelForSeq2SeqLM
      def dm05():
      tokenizer = AutoTokenizer.from_pretrained('../model/distilbart-cnn-12-6')
      model = AutoModelForSeq2SeqLM.from_pretrained('../model/distilbart-cnn-12-6')
      text = "BERT is a transformers model pretrained on a large corpus of English data " \
      "in a self-supervised fashion. This means it was pretrained on the raw texts " \
      "only, with no humans labelling them in any way (which is why it can use lots " \
      "of publicly available data) with an automatic process to generate inputs and " \
      "labels from those texts. More precisely, it was pretrained with two objectives:Masked " \
      "language modeling (MLM): taking a sentence, the model randomly masks 15% of the " \
      "words in the input then run the entire masked sentence through the model and has " \
      "to predict the masked words. This is different from traditional recurrent neural " \
      "networks (RNNs) that usually see the words one after the other, or from autoregressive " \
      "models like GPT which internally mask the future tokens. It allows the model to learn " \
      "a bidirectional representation of the sentence.Next sentence prediction (NSP): the models" \
      " concatenates two masked sentences as inputs during pretraining. Sometimes they correspond to " \
      "sentences that were next to each other in the original text, sometimes not. The model then " \
      "has to predict if the two sentences were following each other or not."
      # 处理输入数据
      # 分词器对象(): 集成了encode_plus和batch_encode_plus方法的功能
      input = tokenizer([text], return_tensors='pt')
      print('input--->', input)
      # 模型推理
      model.eval()
      # generate: 进行封装, 直接获取预测的token下标
      output = model.generate(**input)
      print('output--->', output)
      # 获取token词, 返回特殊文本 vacab.json词表中的词表示,使用的不同分词方式
      # print(tokenizer.convert_ids_to_tokens(output[0]))
      # 使用分词器对象decode方法对结果进行解码, 返回正常的文本
      summary_text = []
      for g in output:
      token = tokenizer.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=True)
      summary_text.append(token)
      print('summary_text--->', summary_text)
    • NER任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      # 命名实体识别
      from transformers import AutoConfig
      from transformers import AutoModelForTokenClassification
      def dm06():
      tokenizer = AutoTokenizer.from_pretrained('../model/roberta-base-finetuned-cluener2020-chinese')
      model = AutoModelForTokenClassification.from_pretrained('../model/roberta-base-finetuned-cluener2020-chinese')
      # 加载模型配置文件对象
      config = AutoConfig.from_pretrained('../model/roberta-base-finetuned-cluener2020-chinese')

      # 输入数据处理
      input = tokenizer('我爱北京天安门,天安门上太阳升', return_tensors='pt')
      print('input--->', input)

      # 模型推理
      model.eval()
      output = model(input['input_ids'])
      # print('output--->', output)
      # [1, 17, 32] -> 模型带输出层, 32种词性
      logits = output.logits
      print('logits--->', logits.shape, logits)

      # 获取真实token
      token = tokenizer.convert_ids_to_tokens(input['input_ids'][0])
      print('token--->', token)

      # 准备空列表存储 token和token词性 [(token, token词性), ...]
      result = []
      for t, l in zip(token, logits[0]):
      # 判断如果是特殊字符, 跳过 [CLS] [SEP]
      # print(tokenizer.all_special_tokens)
      if t in tokenizer.all_special_tokens:
      continue

      # 获取最大概率的下标值
      idx = torch.argmax(l, dim=-1)
      # 根据下标获取对应标签
      label = config.id2label[idx.item()]
      # 保存到列表中
      result.append((t, label))
      print('result--->', result)
  • 具体模型方式

    • 完型填空任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      from transformers import BertTokenizer
      from transformers import BertForMaskedLM
      from transformers import BertConfig
      import torch


      def dm03():
      tokenizer = BertTokenizer.from_pretrained('../model/chinese-bert-wwm')
      model = BertForMaskedLM.from_pretrained('../model/chinese-bert-wwm')

      # 输入数据处理
      input = tokenizer.encode_plus(text='我想明天去[MASK]家吃饭.', return_tensors='pt')
      print('input--->', input)

      # 模型推理
      model.eval()
      # 字典拆包
      output = model(**input)
      print('output--->', output)
      # 对象名.属性名
      logits = output.logits
      print('logits--->', logits.shape, logits)

      # 获取[MASK]位置的预测结果
      # logits[0][6]: mask位置的预测结果 21128个概率值
      print('logits[0][6]--->', logits[0][6])
      mask_idx = torch.argmax(logits[0][6], dim=-1).item()
      print('mask_idx--->', mask_idx)
      # 调用分词器对象的方法, 实现id转换成token
      print(tokenizer.convert_ids_to_tokens([mask_idx]))


      if __name__ == '__main__':
      dm03()
  • tokenizer()、encode()、encode_plus()、batch_encode_plus()对比

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    from transformers import AutoTokenizer

    tokenizer = AutoTokenizer.from_pretrained("../model/bert-base-chinese")

    text1 = "今天天气不错" # 1句话
    text2 = ['nlp是什么?', 'nlp是自然语言处理'] # 2句话 或者 1个句子对
    text3 = [['nlp是什么?', 'nlp是自然语言处理'], ['AI是什么?', 'AI是人工智能']] # 2句话 或 2个句子对

    # encode(): 只能接受 str 或 list[str], 返回 token_id的张量
    print('=======================encode=======================')
    intput1 = tokenizer.encode(text1,
    padding=True,
    truncation=True,
    return_tensors='pt')
    print('intput1--->', intput1)
    intput2 = tokenizer.encode(text2,
    padding=True,
    truncation=True,
    return_tensors='pt')
    print('intput2--->', intput2)

    print('=======================encode_plus=======================')
    # encode_plus(): 只能接受 str 或 list[str], 返回字典{input_ids:xxx , token_type_ids:xxx ,attention_mask:xxx}
    intput1 = tokenizer.encode_plus(text1,
    padding=True,
    truncation=True,
    return_tensors='pt')
    print('intput1--->', intput1)
    intput2 = tokenizer.encode_plus(text2,
    padding=True,
    truncation=True,
    return_tensors='pt')
    print('intput2--->', intput2)

    print('=======================batch_encode_plus=======================')
    # batch_encode_plus(): 批处理, 只能接受list[str]或list[list[str], ...], 返回字典{input_ids:xxx , token_type_ids:xxx ,attention_mask:xxx}
    intput2 = tokenizer.batch_encode_plus(text2,
    padding=True,
    truncation=True,
    return_tensors='pt')
    print('intput2--->', intput2)
    intput3 = tokenizer.batch_encode_plus(text3,
    padding=True,
    truncation=True,
    return_tensors='pt')
    print('intput3--->', intput3)

    print('=======================tokenizer()=======================')
    # tokenizer(): 批处理, 封装了encode_plus和batch_encode_plus功能, 只能接受str,list[str]或list[list[str], ...], 返回字典{input_ids:xxx , token_type_ids:xxx ,attention_mask:xxx}
    intput1 = tokenizer(text1,
    padding=True,
    truncation=True,
    return_tensors='pt')
    print('intput1--->', intput1)
    intput2 = tokenizer(text2,
    padding=True,
    truncation=True,
    return_tensors='pt')
    print('intput2--->', intput2)
    intput3 = tokenizer(text3,
    padding=True,
    truncation=True,
    return_tensors='pt')
    print('intput3--->', intput3)

2 中文文本分类案例

  • 任务介绍

    二分类问题, 根据评论文本预测出好评还是差评

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # 导入工具包
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader
    from datasets import load_dataset
    from transformers import BertTokenizer, BertModel
    from torch.optim import AdamW
    import time

    # 选择设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # 创建分词器对象
    my_tokenizer =BertTokenizer.from_pretrained('model/bert-base-chinese')
    # 创建预训练模型对象
    my_pretrained_model = BertModel.from_pretrained('model/bert-base-chinese').to(device)
    # print('my_pretrained_model--->', my_pretrained_model)
    # 获取预训练模型的隐藏层维度
    hidden_size = my_pretrained_model.config.hidden_size
    # print('hidden_size--->', hidden_size)
  • 加载数据集

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    # 加载数据集
    def get_dataset():
    """
    # 返回DatasetDict, {'train': dataset1, 'test': dataset2, ...}
    my_train_dataset = load_dataset('csv',
    data_files='data/train.csv')
    print('my_train_dataset--->', my_train_dataset)
    """
    # 参数一: 文件格式
    # 参数二: 文件路径
    # 参数三: DatasetDict的key值
    my_dataset_train = load_dataset('csv',
    data_files='data/train.csv',
    split='train')
    print('my_train_dataset--->', my_dataset_train)
    # 根据下标获取数据 切片 {'label':[1,2,3], 'text':[文本1,文本2,文本3]}
    # print('my_train_dataset[:3]--->', my_dataset_train[:3])

    # 测试集
    my_dataset_test = load_dataset('csv',
    data_files='data/test.csv',
    split='train')
    # print('my_dataset_test--->', my_dataset_test)

    # 验证集
    my_dataset_valid = load_dataset('csv',
    data_files='data/validation.csv',
    split='train')
    # print('my_dataset_valid--->', my_dataset_test)
    return my_dataset_train, my_dataset_test, my_dataset_valid
  • 创建数据加载器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    # 定义处理数据加载器批数据函数
    # 函数必须有一个形参, 当前批次的样本数据
    def collate_fn(data):
    """
    处理每批的数据
    :param data: 每批数据 [{'label':xxx, 'text':xxx},{},...]
    :return: 处理后的结果
    """
    # print('data--->', len(data), data)
    # 获取每批数据的文本和标签
    # data: 列表嵌套字典
    # item: 字典
    # item['text']: 通过key获取字典value
    sents = [item['text'] for item in data]
    labels = [item['label'] for item in data]
    print('sents--->', sents)
    print('labels--->', labels)

    # 处理输入文本数据, 将数据转换成预训练模型要求格式
    inputs = my_tokenizer(sents,
    truncation=True,
    padding='max_length',
    max_length=500,
    return_tensors='pt')
    print('inputs--->', inputs)
    # 获取inputs中数据迁移到GPU上
    input_ids = inputs['input_ids'].to(device)
    token_type_ids = inputs['token_type_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    # 处理标签数据
    labels = torch.LongTensor(labels, device=device)
    # print('labels--->', labels)
    return input_ids, token_type_ids, attention_mask, labels


    if __name__ == '__main__':
    # todo:1- 加载数据集
    my_dataset_train, my_dataset_test, my_dataset_valid = get_dataset()
    # todo:2- 创建数据加载器对象
    # drop_last: 是否丢掉最后一批样本
    # collate_fn: 接收函数名, 函数可以实现处理每批样本数据
    my_dataloader = DataLoader(dataset=my_dataset_train,
    shuffle=True,
    batch_size=8,
    drop_last=True,
    collate_fn=collate_fn)
    # 调用数据加载器对象
    for i, (input_ids, token_type_ids, attention_mask, labels) in enumerate(my_dataloader):
    print('i--->', i)
    print('input_ids--->', input_ids.shape, input_ids)
    print('token_type_ids--->', token_type_ids.shape, token_type_ids)
    print('attention_mask--->', attention_mask.shape, attention_mask)
    print('labels--->', labels.shape, labels)
    break
  • 自定义下游任务网络模型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    定义下游任务的神经网络模型
    class MyModel(nn.Module):
    # todo:1- init方法
    def __init__(self):
    super().__init__()
    # 线性层计算, 得到二分类结果
    # hidden_size: 预训练模型的隐层输出维度
    # 2: 二分类
    self.fc = nn.Linear(hidden_size, 2)

    # todo:2- forward方法
    def forward(self, input_ids, token_type_ids, attention_mask):
    # todo:3- 调用预训练模型, 获取向量表示
    with torch.no_grad(): # 全参冻结 方式二
    output = my_pretrained_model(input_ids, token_type_ids, attention_mask)
    print('output--->', output)
    # 获取[CLS] token的最后一层隐藏状态值, 表示整个文本的语义
    print(output.last_hidden_state.shape)
    # 获取[CLS] token表示 8个句子的[CLS]
    print(output.last_hidden_state[:, 0].shape, output.last_hidden_state[:, 0])
    # 获取pooler_output作为模型输出, 给到下游神经网络进行分类
    output = self.fc(output.pooler_output)
    return output


    if __name__ == '__main__':
    # todo:1- 加载数据集
    my_dataset_train, my_dataset_test, my_dataset_valid = get_dataset()
    # todo:2- 创建数据加载器对象
    # drop_last: 是否丢掉最后一批样本
    # collate_fn: 接收函数名, 函数可以实现处理每批样本数据
    my_dataloader = DataLoader(dataset=my_dataset_train,
    shuffle=True,
    batch_size=8,
    drop_last=True,
    collate_fn=collate_fn)
    # 冻结预训练模型参数 方式一:
    # for param in my_pretrained_model.parameters():
    # param.requires_grad = False
    # todo:3- 创建下游模型对象
    my_model = MyModel().to(device)
    # 调用数据加载器对象
    for i, (input_ids, token_type_ids, attention_mask, labels) in enumerate(my_dataloader):
    output_y = my_model(input_ids, token_type_ids, attention_mask)
    print('output_y--->', output_y.shape, output_y)
    break
  • 模型训练

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    # 模型训练
    def train():
    # todo:1- 加载数据集
    my_dataset_train, my_dataset_test, my_dataset_valid = get_dataset()
    # todo:2- 创建数据加载器对象
    # drop_last: 是否丢掉最后一批样本
    # collate_fn: 接收函数名, 函数可以实现处理每批样本数据
    my_dataloader = DataLoader(dataset=my_dataset_train,
    shuffle=True,
    batch_size=8,
    drop_last=True,
    collate_fn=collate_fn)
    # todo:3- 创建模型对象
    my_model = MyModel().to(device)
    # todo:4- 创建优化器对象
    my_optimizer = AdamW(params=my_model.parameters(), lr=1e-5)
    # todo:5- 创建损失函数对象
    my_cross_entropy = nn.CrossEntropyLoss()
    # 切换模型训练模式
    my_model.train()
    epochs = 3
    # 循环遍历轮次
    for epoch in range(epochs):
    starttime = int(time.time())
    # 循环遍历数据加载器
    for i, (input_ids, token_type_ids, attention_mask, labels) in enumerate(my_dataloader, start=1):
    # 模型预测
    output_y = my_model(input_ids, token_type_ids, attention_mask)
    # 计算损失
    loss = my_cross_entropy(output_y, labels)
    # 梯度清零
    my_optimizer.zero_grad()
    # 反向传播
    loss.backward()
    # 梯度更新
    my_optimizer.step()
    # 每5次迭代 算一下准确率
    if i % 5 == 0:
    # 获取最大值对应的下标 0或1
    out = output_y.argmax(dim=1) # [8,2] --> (8,)
    # out == labels: 返回一个bool值列表
    # sum():True为1, Fasle为0
    acc = (out == labels).sum().item() / len(labels)
    print('轮次:%d 迭代数:%d 损失:%.6f 准确率%.3f 时间%d' \
    % (epoch, i, loss.item(), acc, int(time.time()) - starttime))
    torch.save(my_model.state_dict(), './model/my_model_%d.bin' % (epoch + 1))
  • 模型推理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    def inference():
    my_dataset_train, my_dataset_test, my_dataset_valid = get_dataset()
    my_dataloader = DataLoader(dataset=my_dataset_test,
    shuffle=True,
    collate_fn=collate_fn,
    drop_last=True,
    batch_size=8)
    my_model = MyModel().to(device)
    my_model.load_state_dict(torch.load('train_model/my_model_class_3.bin', map_location=lambda storage,loc: storage), strict=False)
    my_model.eval()

    correct = 0
    total = 0
    for i, (input_ids, token_type_ids, attention_mask, labels) in enumerate(my_dataloader):
    with torch.no_grad():
    output_y = my_model(input_ids, token_type_ids, attention_mask)


    out = torch.argmax(output_y, dim=-1)
    correct += (out==labels).sum().item()
    total += len(labels)
    # 每5次迭代打印一次准确率
    if i % 5 == 0:
    print(correct / total, end=" ")
    # input_ids[0]: 获取每条样本的文本 评论
    print(my_tokenizer.decode(input_ids[0], skip_special_tokens=True), end=" ")
    print('预测值 真实值:', out[0].item(), labels[0].item())

    if __name__ == '__main__':
    # 模型训练
    # train()
    inference()

3 中文完型填空案例

  • 任务介绍

    • 预测文本中被[mask]掉的位置的token, 预测的结果是词表大小的概率, 21128个概率值, 获取最大概率对应的下标的token作为[mask]位置的预测结果

    • MLM任务也是BERT预训练任务, 换句话说BERT模型是由MLM任务训练得到的

  • 加载数据集

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader
    from datasets import load_dataset
    from transformers import BertTokenizer, BertModel
    from torch.optim import AdamW
    import time

    # 检查是否有可用的GPU
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f'Using device: {device}')

    # 加载字典和分词工具
    my_tokenizer = BertTokenizer.from_pretrained('model/bert-base-chinese')

    # 加载预训练模型
    my_model_pretrained = BertModel.from_pretrained('model/bert-base-chinese').to(device)

    # 查看预训练模型的输出维度
    hidden_size = my_model_pretrained.config.hidden_size
    print('hidden_size--->', hidden_size) # 768


    # 加载数据集
    def get_dataset():
    my_train_dataset = load_dataset('csv', data_files='data/train.csv', split='train')
    # print('my_train_dataset--->', my_train_dataset)
    # 过滤掉文本长度小于等于32的样本
    # x->每条样本都有两个label, text
    my_train_dataset = my_train_dataset.filter(lambda x: len(x['text']) > 32)
    # print('my_train_dataset--->', my_train_dataset)

    # 测试集
    my_test_dataset = load_dataset('csv', data_files='data/test.csv', split='train')
    my_test_dataset = my_test_dataset.filter(lambda x: len(x['text']) > 32)
    # print('my_test_dataset--->', my_test_dataset)
    return my_train_dataset, my_test_dataset


    if __name__ == '__main__':
    my_train_dataset, my_test_dataset = get_dataset()
  • 创建数据加载器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    # 定义数据处理函数
    def collate_fn(data):
    print('data--->', len(data), data)
    sents = [i['text'] for i in data]
    # print('sents--->', sents)

    # 调用分词器处理输入样本数据
    inputs = my_tokenizer(sents,
    truncation=True,
    padding='max_length',
    max_length=500,
    return_tensors='pt')

    # 获取input_ids, token_type_ids, attention_mask
    input_ids = inputs['input_ids'].to(device)
    token_type_ids = inputs['token_type_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)
    # print('input_ids--->', input_ids[:, 16])

    # 将每个句子中第17个token进行[MASK]
    # 获取第17个token, 作为真实标签进行损失值计算
    labels = input_ids[:, 16].clone()
    # print('labels--->', labels)
    # 将第17个token替换成[MASK]对应的下标值
    # 获取mask下标
    # print(my_tokenizer.mask_token_id)
    # 获取mask token值
    # print(my_tokenizer.mask_token)
    # 获取词表
    # print(my_tokenizer.get_vocab())
    # print(my_tokenizer.get_vocab()[my_tokenizer.mask_token])
    input_ids[:, 16] = my_tokenizer.mask_token_id
    # print('input_ids--->', input_ids[:, 16])
    return input_ids, token_type_ids, attention_mask, labels



    if __name__ == '__main__':
    my_train_dataset, my_test_dataset = get_dataset()
    my_dataloader = DataLoader(dataset=my_train_dataset,
    shuffle=True,
    batch_size=8,
    drop_last=True,
    collate_fn=collate_fn)
    for i, (input_ids, token_type_ids, attention_mask, labels) in enumerate(my_dataloader):
    print('input_ids--->', input_ids)
    break
  • 自定义下游任务网络模型

  • 模型训练

  • 模型推理

4 中文句子关系案例

  • 任务介绍

  • 加载数据集

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader
    from torch.utils.data import Dataset
    from datasets import load_dataset
    from transformers import BertTokenizer, BertModel
    from torch.optim import AdamW
    import random
    import time

    # 检查是否有可用的GPU
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f'Using device: {device}')

    # 加载字典和分词工具
    my_tokenizer = BertTokenizer.from_pretrained('model/bert-base-chinese')

    # 加载预训练模型
    my_model_pretrained = BertModel.from_pretrained('model/bert-base-chinese').to(device)

    # 查看预训练模型的输出维度
    hidden_size = my_model_pretrained.config.hidden_size
    print('hidden_size--->', hidden_size) # 768


    class MyDataset(Dataset):
    def __init__(self, data_csv_path):
    # 生成数据源dataset对象
    my_dataset_temp = load_dataset('csv', data_files=data_csv_path, split="train")
    # print('my_dataset_temp--->', my_dataset_temp)

    # 按照条件过滤数据源对象
    self.my_dataset = my_dataset_temp.filter(lambda x: len(x['text']) > 44)
    # print('self.my_dataset--->', self.my_dataset)
    # print('self.my_dataset[0:3]-->', self.my_dataset[0:3])

    self.length = len(self.my_dataset)

    def __len__(self):
    return self.length

    def __getitem__(self, index):
    # 获取index对应的样本
    print(self.my_dataset[index])
    # 获取字典中text内容, 文本
    print(self.my_dataset[index]['text'])
    # 对文本进行一分为二, 分为2个句子
    # 前22个token组成句子1
    sentence1 = self.my_dataset[index]['text'][:22]
    sentence2 = self.my_dataset[index]['text'][22:]
    # print('sentence1--->', sentence1)
    # print('sentence2--->', sentence2)
    label = 1
    # 随机生成句子2, 不是句子1的下一句
    if random.randint(0, 1) == 0:
    # 从数据源中随机获取一个样本, 对text文本进行拆分, 获取22后的所有token作为句子2(不是句子1的下一句)
    idx = random.randint(0, self.length-1)
    sentence2 = self.my_dataset[idx]['text'][22:]
    # print('sentence2--->', sentence2)
    label = 0
    return sentence1, sentence2, label
  • 创建数据加载器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    def collate_fn(data):
    print('data--->', data)
    sents = [i[:2] for i in data] # (句子1, 句子2)
    labels = [i[2] for i in data]
    print('sents--->', sents)
    print('labels--->', labels)

    if __name__ == '__main__':
    my_dataset = MyDataset('data/train.csv')
    print(my_dataset)
    # print(my_dataset[0])
    # 通过dataloader进行迭代
    my_dataloader = DataLoader(my_dataset,
    batch_size=8,
    collate_fn=collate_fn,
    shuffle=True,
    drop_last=True)
    print('my_dataloader--->', my_dataloader)
    for (input_ids, attention_mask, token_type_ids, labels) in my_dataloader:
    print(my_tokenizer.decode(input_ids[0])) # 打印每个批次的第1句话
    print(input_ids.shape, attention_mask.shape, token_type_ids.shape, labels)
    break
  • 自定义下游任务网络模型

  • 模型训练

  • 模型推理