使用 TensorFlow 2.0 的 Transformer 聊天机器人教程
2019 年 5 月 23 日
Bryan M. LiFOR.ai 撰写的客座文章

如今,使用人工神经网络创建聊天机器人越来越流行,然而,教计算机进行自然对话非常困难,通常需要大型而复杂的语言模型。

随着 TensorFlow 2.0 中所有更改和改进的实现,我们可以轻松构建复杂的模型。在本博文中,我们将演示如何构建 Transformer 聊天机器人。本博文中使用的所有代码都可以在这个 colab 笔记本 中找到,该笔记本将端到端运行(包括安装 TensorFlow 2.0)。

本文假设您对 文本生成注意力Transformer 有一定的了解。在本教程中,我们将重点介绍
  • 使用 TensorFlow Datasets 预处理康奈尔电影对话语料库,并使用 tf.data 创建输入管道
  • 使用模型子类化实现多头注意力
  • 使用函数式 API 实现 Transformer
input: where have you been ?
output: i m not talking about that .
input: i am not crazy , my mother had me tested .
output: i m not sure . i m not hungry .
input: i m not sure . i m not hungry .
output: you re a liar .
input: you re a liar .
output: i m not going to be a man . i m gonna need to go to school .
在电影对话语料库上训练的 Transformer 聊天机器人的示例对话。

Transformer

Transformer 在论文 Attention is All You Need 中提出,是一种完全基于自注意力机制的神经网络架构,并且具有很高的并行性。 Transformer 模型使用自注意力层的堆栈来处理可变大小的输入,而不是 RNN 或 CNN。这种通用架构具有许多优点
  • 它对数据中的时间/空间关系不作任何假设。这非常适合处理一组对象。
  • 层输出可以并行计算,而不是像 RNN 那样串行计算。
  • 远距离项目可以相互影响彼此的输出,而无需经过许多循环步骤或卷积层。
  • 它可以学习长距离依赖关系。
这种架构的缺点
  • 对于时间序列,时间步长的输出是根据整个历史而不是仅根据输入和当前隐藏状态计算的。这可能效率较低。
  • 如果输入确实具有时间/空间关系(如文本),则必须添加一些位置编码,否则模型实际上会看到一个词袋。
如果您有兴趣了解更多关于 Transformer 的信息,请查看 The Annotated TransformerIllustrated Transformer.

数据集

我们使用 康奈尔电影对话语料库 作为我们的数据集,该语料库包含来自超过 10000 对电影角色之间 220000 多个对话交流。

“+++$+++” 用作语料库数据集中所有文件中字段的分隔符。

movie_conversations.txt 具有以下格式:第一个角色的 ID、第二个角色的 ID、发生此对话的电影的 ID,以及线路 ID 列表。角色和电影信息分别可以在 movie_characters_metadata.txtmovie_titles_metadata.txt 中找到。
u0 +++$+++ u2 +++$+++ m0 +++$+++ [‘L194’, ‘L195’, ‘L196’, ‘L197’]
u0 +++$+++ u2 +++$+++ m0 +++$+++ [‘L198’, ‘L199’]
u0 +++$+++ u2 +++$+++ m0 +++$+++ [‘L200’, ‘L201’, ‘L202’, ‘L203’]
u0 +++$+++ u2 +++$+++ m0 +++$+++ [‘L204’, ‘L205’, ‘L206’]
u0 +++$+++ u2 +++$+++ m0 +++$+++ [‘L207’, ‘L208’]
来自 movie_conversations.txt 的对话对示例 movie_lines.txt 具有以下格式:对话行的 ID、说出这句话的角色的 ID、电影的 ID、角色的姓名以及这句话的文本。
L901 +++$+++ u5 +++$+++ m0 +++$+++ KAT +++$+++ He said everyone was doing it. So I did it.
L900 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ As in…
L899 +++$+++ u5 +++$+++ m0 +++$+++ KAT +++$+++ Now I do. Back then, was a different story.
L898 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ But you hate Joey
L897 +++$+++ u5 +++$+++ m0 +++$+++ KAT +++$+++ He was, like, a total babe
来自 movie_lines.txt 的对话文本示例 我们将通过以下步骤构建输入管道
  • move_conversations.txtmovie_lines.txt 中提取对话对列表
  • 通过从每个句子中删除特殊字符来预处理每个句子。
  • 使用 TensorFlow Datasets SubwordTextEncoder 构建标记器(将文本映射到 ID,并将 ID 映射到文本)。
  • 标记每个句子,并在每个句子的开头和结尾添加 START_TOKENEND_TOKEN
  • 过滤掉包含超过 MAX_LENGTH 个标记的句子。
  • 将标记后的句子填充到 MAX_LENGTH
  • 使用标记后的句子构建 tf.data.Dataset
请注意,Transformer 是一个自回归模型,它一次预测一个部分,并使用它迄今为止的输出来决定下一步的操作。在训练期间,此示例使用教师强迫。教师强迫是指无论模型在当前时间步长预测什么,都将真实输出传递到下一个时间步长。

完整的预处理代码可以在 colab 笔记本的 准备数据集 部分找到。
i really , really , really wanna go , but i can t . not unless my sister goes .
i m workin on it . but she doesn t seem to be goin for him .
预处理后的对话对示例
来自 Attention is All You Need 的注意力架构图

注意力

与许多序列到序列模型一样,Transformer 也包含编码器和解码器。但是,Transformer 使用多头注意力层而不是循环层或卷积层,这些层由多个缩放点积注意力组成。

缩放点积注意力

缩放点积注意力函数接受三个输入:Q(查询)、K()、V()。用于计算注意力权重的方程为: 由于 softmax 归一化应用于 ,因此它的值决定了对 查询 给予多少重要性。输出表示注意力权重与 的乘积。这确保了我们想要关注的词保持原样,而无关的词被冲洗掉。
def scaled_dot_product_attention(query, key, value, mask):
  matmul_qk = tf.matmul(query, key, transpose_b=True)

  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # add the mask zero out padding tokens.
  if mask is not None:
    logits += (mask * -1e9)

  attention_weights = tf.nn.softmax(logits, axis=-1)

  return tf.matmul(attention_weights, value)
缩放点积注意力层的实现

多头注意力层

顺序模型允许我们通过简单地将层堆叠在一起来快速构建模型;但是,对于更复杂和非顺序模型,需要函数式 API 和模型子类化。tf.keras API 允许我们混合使用不同的 API 样式。我最喜欢的模型子类化功能是调试功能。我可以在 call() 方法中设置断点,并像 numpy 数组一样观察每个层的输入和输出的值,这使得调试变得更加简单。

在这里,我们使用模型子类化来实现 MultiHeadAttention 层。

多头注意力包含四个部分
  • 线性层并拆分为头。
  • 缩放点积注意力。
  • 头的连接。
  • 最终线性层。

每个多头注意力块都接受一个字典作为输入,该字典包含查询、键和值。请注意,当使用模型子类化和函数式 API 时,输入必须保持为单个参数,因此我们必须将查询、键和值包装为字典。

然后,输入将经过密集层并拆分为多个头。上面定义的 scaled_dot_product_attention() 将应用于每个头(为了效率而进行广播)。必须在注意力步骤中使用适当的掩码。然后将每个头的注意力输出连接起来,并经过最终的密集层。

我们使用多个头而不是单个注意力头,因为查询、键和值被拆分为多个头,因为它允许模型从不同的表示空间中联合关注不同位置的信息。拆分后,每个头都具有降低的维度,因此总计算成本与具有完整维度的单个头注意力的成本相同。
class MultiHeadAttention(tf.keras.layers.Layer):

  def __init__(self, d_model, num_heads, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0

    self.depth = d_model // self.num_heads

    self.query_dense = tf.keras.layers.Dense(units=d_model)
    self.key_dense = tf.keras.layers.Dense(units=d_model)
    self.value_dense = tf.keras.layers.Dense(units=d_model)

    self.dense = tf.keras.layers.Dense(units=d_model)

  def split_heads(self, inputs, batch_size):
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth))
    return tf.transpose(inputs, perm=[0, 2, 1, 3])

  def call(self, inputs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs[
        'value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # linear layers
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value)

    # split heads
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size)

    scaled_attention = scaled_dot_product_attention(query, key, value, mask)

    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    outputs = self.dense(concat_attention)

    return outputs
使用模型子类化的多头注意力层的实现

Transformer

来自 Attention is All You Need 的 Transformer 架构图
Transformer 对编码器和解码器都使用堆叠的多头注意力和密集层。编码器将符号表示的输入序列映射到连续表示的序列。然后解码器接收连续表示并逐个元素生成符号的输出序列。

位置编码

由于 Transformer 不包含任何循环或卷积,因此添加了位置编码以向模型提供有关句子中词的相对位置的一些信息。
用于计算位置编码的公式
位置编码向量将添加到嵌入向量。嵌入将令牌表示在 d 维空间中,其中具有相似含义的令牌彼此更接近。但嵌入不编码句子中词的相对位置。因此,在添加位置编码后,词将基于其含义和在句子中的位置的相似性,在 d 维空间中彼此更接近。要详细了解位置编码,请查看此 教程.

我们使用模型子类化实现了位置编码,我们在 call() 中将编码矩阵应用于输入。
class PositionalEncoding(tf.keras.layers.Layer):

  def __init__(self, position, d_model):
    super(PositionalEncoding, self).__init__()
    self.pos_encoding = self.positional_encoding(position, d_model)

  def get_angles(self, position, i, d_model):
    angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
    return position * angles

  def positional_encoding(self, position, d_model):
    angle_rads = self.get_angles(
        position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
        i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
        d_model=d_model)
    # apply sin to even index in the array
    sines = tf.math.sin(angle_rads[:, 0::2])
    # apply cos to odd index in the array
    cosines = tf.math.cos(angle_rads[:, 1::2])

    pos_encoding = tf.concat([sines, cosines], axis=-1)
    pos_encoding = pos_encoding[tf.newaxis, ...]
    return tf.cast(pos_encoding, tf.float32)

  def call(self, inputs):
    return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
使用模型子类化的位置编码的实现

使用函数式 API 的 Transformer

使用函数式 API,我们可以像顺序模型一样堆叠层,但没有顺序模型的约束,并且不需要像模型子类化那样预先声明所有需要的变量和层。函数式 API 的一个优点是它在构建模型时会对其进行验证,例如检查每个层的输入和输出形状,并在出现不匹配时引发有意义的错误消息。

我们正在使用函数式 API 实现编码层、编码器、解码层、解码器和 Transformer 本身。

查看如何使用模型子类化从这个 教程 中实现相同的模型。

编码层

每个编码层包含子层
  • 多头注意力(带填充掩码)
  • 2 个密集层,然后是 dropout
def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  attention = MultiHeadAttention(
      d_model, num_heads, name="attention")({
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': padding_mask
      })
  attention = tf.keras.layers.Dropout(rate=dropout)(attention)
  attention = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(inputs + attention)

  outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention + outputs)

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)
使用函数式 API 实现编码层
我们可以使用 tf.keras.utils.plot_model() 来可视化我们的模型。(查看 colab 笔记本上的所有模型图)
编码层的流程图

编码器

编码器包含
  • 输入嵌入
  • 位置编码
  • N 个编码层
输入通过嵌入,与位置编码相加。该求和的输出是编码层的输入。编码器的输出是解码器的输入。
def encoder(vocab_size,
            num_layers,
            units,
            d_model,
            num_heads,
            dropout,
            name="encoder"):
  inputs = tf.keras.Input(shape=(None,), name="inputs")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)

  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  for i in range(num_layers):
    outputs = encoder_layer(
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
        name="encoder_layer_{}".format(i),
    )([outputs, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)
使用函数式 API 实现编码器

解码层

每个解码层包含子层
  • 掩码多头注意力(带前瞻掩码和填充掩码)
  • 多头注意力(带填充掩码)。值和键接收编码器输出作为输入。查询接收来自掩码多头注意力子层的输出。
  • 2 个密集层,然后是 dropout
由于查询接收解码器第一个注意力块的输出,而键接收编码器输出,因此注意力权重代表了根据编码器输出赋予解码器输入的重要性。换句话说,解码器通过查看编码器输出并自注意力其自身的输出来预测下一个词。
def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
  enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
  look_ahead_mask = tf.keras.Input(
      shape=(1, None, None), name="look_ahead_mask")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')

  attention1 = MultiHeadAttention(
      d_model, num_heads, name="attention_1")(inputs={
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': look_ahead_mask
      })
  attention1 = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention1 + inputs)

  attention2 = MultiHeadAttention(
      d_model, num_heads, name="attention_2")(inputs={
          'query': attention1,
          'key': enc_outputs,
          'value': enc_outputs,
          'mask': padding_mask
      })
  attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
  attention2 = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention2 + attention1)

  outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention2)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(outputs + attention2)

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)
使用函数式 API 实现解码层

解码器

解码器包含
  • 输出嵌入
  • 位置编码
  • N 个解码层
目标通过嵌入,与位置编码相加。该求和的输出是解码层的输入。解码器的输出是最终线性层的输入。
def decoder(vocab_size,
            num_layers,
            units,
            d_model,
            num_heads,
            dropout,
            name='decoder'):
  inputs = tf.keras.Input(shape=(None,), name='inputs')
  enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
  look_ahead_mask = tf.keras.Input(
      shape=(1, None, None), name='look_ahead_mask')
  padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
  
  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)

  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  for i in range(num_layers):
    outputs = decoder_layer(
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
        name='decoder_layer_{}'.format(i),
    )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)
使用函数式 API 实现解码器

Transformer

Transformer 由编码器、解码器和最终线性层组成。解码器的输出是线性层的输入,其输出被返回。

enc_padding_maskdec_padding_mask 用于掩盖所有填充标记。look_ahead_mask 用于掩盖序列中的未来标记。由于掩码的长度随着不同的输入序列长度而改变,因此我们使用 Lambda 层来创建这些掩码。
def transformer(vocab_size,
                num_layers,
                units,
                d_model,
                num_heads,
                dropout,
                name="transformer"):
  inputs = tf.keras.Input(shape=(None,), name="inputs")
  dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")

  enc_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='enc_padding_mask')(inputs)
  # mask the future tokens for decoder inputs at the 1st attention block
  look_ahead_mask = tf.keras.layers.Lambda(
      create_look_ahead_mask,
      output_shape=(1, None, None),
      name='look_ahead_mask')(dec_inputs)
  # mask the encoder outputs for the 2nd attention block
  dec_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='dec_padding_mask')(inputs)

  enc_outputs = encoder(
      vocab_size=vocab_size,
      num_layers=num_layers,
      units=units,
      d_model=d_model,
      num_heads=num_heads,
      dropout=dropout,
  )(inputs=[inputs, enc_padding_mask])

  dec_outputs = decoder(
      vocab_size=vocab_size,
      num_layers=num_layers,
      units=units,
      d_model=d_model,
      num_heads=num_heads,
      dropout=dropout,
  )(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])

  outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)

  return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)
使用函数式 API 实现 Transformer

训练模型

我们可以按如下方式初始化 Transformer
NUM_LAYERS = 2
D_MODEL = 256
NUM_HEADS = 8
UNITS = 512
DROPOUT = 0.1

model = transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT)
在定义损失函数、优化器和指标后,我们可以简单地使用 model.fit() 训练我们的模型。注意,我们必须掩盖我们的损失函数,以便忽略填充标记,我们还编写了自定义学习率。
def loss_function(y_true, y_pred):
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none')(y_true, y_pred)

  mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
  loss = tf.multiply(loss, mask)

  return tf.reduce_mean(loss)

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps**-1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(D_MODEL)

optimizer = tf.keras.optimizers.Adam(
    learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy(y_true, y_pred):
  # ensure labels have shape (batch_size, MAX_LENGTH - 1)
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  accuracy = tf.metrics.SparseCategoricalAccuracy()(y_true, y_pred)
  return accuracy

model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])

EPOCHS = 20

model.fit(dataset, epochs=EPOCHS)

评估

为了评估,我们必须一次运行一个时间步长的推断,并将前一个时间步长的输出作为输入。

注意,我们通常不会在推断期间应用 dropout,但我们没有为我们的模型指定 training 参数。这是因为 trainingmask 已经为我们内置,如果我们想运行 model 进行评估,我们可以简单地调用 model(inputs, training=False) 以在推断模式下运行模型。
def evaluate(sentence):
  sentence = preprocess_sentence(sentence)

  sentence = tf.expand_dims(
      START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)

  output = tf.expand_dims(START_TOKEN, 0)

  for i in range(MAX_LENGTH):
    predictions = model(inputs=[sentence, output], training=False)

    # select the last word from the seq_len dimension
    predictions = predictions[:, -1:, :]
    predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

    # return the result if the predicted_id is equal to the end token
    if tf.equal(predicted_id, END_TOKEN[0]):
      break

    # concatenated the predicted_id to the output which is given to the decoder as its input.
    output = tf.concat([output, predicted_id], axis=-1)

  return tf.squeeze(output, axis=0)

def predict(sentence):
  prediction = evaluate(sentence)
  predicted_sentence = tokenizer.decode([i for i in prediction if i < tokenizer.vocab_size])
  return predicted_sentence
Transformer 评估实现
为了测试我们的模型,我们可以调用 predict(sentence)
>>> output = predict(‘Where have you been?’)
>>> print(output)
i don t know . i m not sure . i m a paleontologist .

总结

到这里,我们已经使用 TensorFlow 2.0 在大约 500 行代码中实现了 Transformer。

在本教程中,我们重点介绍了使用函数式 API 和模型子类化实现复杂模型的两种不同方法,以及如何将它们结合起来。

如果你想了解更多关于这两种不同方法及其优缺点的信息,请查看 TensorFlow 指南上的 何时使用函数式 API 部分。

尝试使用不同的数据集或超参数来训练 Transformer!感谢阅读。
下一篇文章
A Transformer Chatbot Tutorial with TensorFlow 2.0

Bryan M. LiFOR.ai 撰写的客座文章

目前,使用人工神经网络创建聊天机器人越来越流行,然而,教会计算机进行自然的对话非常困难,通常需要大型且复杂的语言模型。

随着 TensorFlow 2.0 中所做的所有更改和改进,我们可以轻松地构建复杂的模型。在本博文中,我们将演示…