借助 Dataflow 针对大规模 ML 推理模式使用 TFX 推理
2021 年 5 月 06 日

由高级员工开发者倡导者 Reza Rokni 发帖

本篇博文系列的第一部分 中,我们讨论了高效部署机器学习模型以通过 Google Cloud Dataflow 进行推理的最佳实践和模式。除了其他技术以外,它还展示了高效的输入批处理,以及使用 shared.py 高效利用模型。

在这篇文章中,我们将介绍如何使用 RunInference API(来自 TensorFlow Extended (TFX) 的实用转换),它使我们无需手动实施 第一部分 中描述的模式。在批处理或流模式下构建生产推理管道时,你可以使用 RunInference 简化管道并减少技术债务。

涵盖以下四种模式

  • 使用 RunInference 发起 ML 预测调用。
  • 后处理 RunInference 结果在业务流程中,预测通常是多步骤流程的第一部分。在此,我们将结果处理成可以在下游使用的形式。
  • 附加关键字。除了传递给模型的数据外,通常还需要一个标识符(例如,物联网设备 ID 或客户标识符) - 即使它不用于模型本身,但会在后续流程中使用它。我们展示如何实现此操作。
  • 在同一管道中使用多个模型进行推理。通常,您可能需要在同一管道中运行多个模型,无论它们是并行执行还是作为一系列预测 - 处理 - 预测调用执行。我们逐步介绍一个简单的示例。

创建简单模型

为了说明这些模式,我们将使用简单的玩具模型,该模型让我们能够专注于为管道的输入和输出所需的数据工程。我们将对此模型进行训练以逼近乘以数字 5 的结果。

请注意,以下代码片段可以在笔记本环境中作为单元格运行。

步骤 1 - 设置库和导入

%pip install tfx_bsl==0.29.0 --quiet
import argparse

import tensorflow as tf
from tensorflow import keras
from tensorflow_serving.apis import prediction_log_pb2

import apache_beam as beam
import tfx_bsl
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public import tfxio
from tfx_bsl.public.proto import model_spec_pb2

import numpy

from typing import Dict, Text, Any, Tuple, List

from apache_beam.options.pipeline_options import PipelineOptions

project = "<your project>"
bucket = "<your bucket>"

save_model_dir_multiply = f'gs://{bucket}/tfx-inference/model/multiply_five/v1/'
save_model_dir_multiply_ten = f'gs://{bucket}/tfx-inference/model/multiply_ten/v1/'

步骤 2 - 创建示例数据

在此步骤中,我们创建了一个包含 0 到 99 范围的值和与每个值乘以 5 相对应的标签的小型数据集。

"""
Create our training data which represents the 5 times multiplication table for 0 to 99. x is the data and y the labels. 

x is a range of values from 0 to 99.
y is a list of 5x

value_to_predict includes a values outside of the training data
"""
x = numpy.arange(0, 100)
y = x * 5

步骤 3 - 创建简单模型,编译和拟合它

"""
Build a simple linear regression model.
Note the model has a shape of (1) for its input layer, it will expect a single int64 value.
"""
input_layer = keras.layers.Input(shape=(1), dtype=tf.float32, name='x')
output_layer= keras.layers.Dense(1)(input_layer)

model = keras.Model(input_layer, output_layer)
model.compile(optimizer=tf.optimizers.Adam(), loss='mean_absolute_error')
model.summary()

让我们教模型了解乘以 5 的操作。

model.fit(x, y, epochs=2000)

接下来,使用一些测试数据检查模型的性能如何。

value_to_predict = numpy.array([105, 108, 1000, 1013], dtype=numpy.float32)
model.predict(value_to_predict)

从以下结果来看,这个简单模型已经学会了它的 5 倍表,足以满足我们的需要!

OUTPUT: 

array([[ 524.9939],
       [ 539.9937],
       [4999.935 ],
       [5064.934 ]], dtype=float32)

步骤 4 - 将输入转换为 tf.example

在刚构建的模型中,我们使用了一个简单列表来生成数据并将数据传递给模型。在下一步中,我们通过在模型训练中使用 tf.example 对象使模型更加健壮。

tf.example 是从名称到张量的可序列化字典(或映射),它确保了即使向基础示例添加新功能,模型依然能够运行。使用 tf.example 还带来了一个好处,即以高效的序列化格式跨模型移植数据。

要将 tf.example 用于此示例,首先需要创建一个助手类 ExampleProcessor,该类用于序列化数据点。

class ExampleProcessor:
  
   def create_example_with_label(self, feature: numpy.float32,
                            label: numpy.float32)-> tf.train.Example:
       return tf.train.Example(
           features=tf.train.Features(
                 feature={'x': self.create_feature(feature),
                          'y' : self.create_feature(label)
                 }))

   def create_example(self, feature: numpy.float32):
       return tf.train.Example(
           features=tf.train.Features(
                 feature={'x' : self.create_feature(feature)})
           )

   def create_feature(self, element: numpy.float32):
       return tf.train.Feature(float_list=tf.train.FloatList(value=[element]))

使用 ExampleProcess 类,现在可以将内存中列表移到磁盘上。

# Create our labeled example file for 5 times table

example_five_times_table = 'example_five_times_table.tfrecord'

with tf.io.TFRecordWriter(example_five_times_table) as writer:
 for i in zip(x, y):
   example = ExampleProcessor().create_example_with_label(
       feature=i[0], label=i[1])
   writer.write(example.SerializeToString())

# Create a file containing the values to predict

predict_values_five_times_table = 'predict_values_five_times_table.tfrecord'

with tf.io.TFRecordWriter(predict_values_five_times_table) as writer:
 for i in value_to_predict:
   example = ExampleProcessor().create_example(feature=i)
   writer.write(example.SerializeToString())

将新示例存储在磁盘上的 TFRecord 文件中后,我们可以使用 Dataset API 预先准备数据,使模型可以消费它们。

RAW_DATA_TRAIN_SPEC = {
'x': tf.io.FixedLenFeature([], tf.float32),
'y': tf.io.FixedLenFeature([], tf.float32)
}

RAW_DATA_PREDICT_SPEC = {
'x': tf.io.FixedLenFeature([], tf.float32),
}

在特性规范就位后,我们可以像以前一样训练模型。

dataset = tf.data.TFRecordDataset(example_five_times_table) 
dataset = dataset.map(lambda e : tf.io.parse_example(e, RAW_DATA_TRAIN_SPEC)) 
dataset = dataset.map(lambda t : (t['x'], t['y'])) 
dataset = dataset.batch(100) 
dataset = dataset.repeat()
model.fit(dataset, epochs=500, steps_per_epoch=1)

我们使用 TFX 流水线 构建模型时,会自动执行这些步骤,不用像本文中这样手工构建模型。

步骤 5 - 保存模型

有了模型后,我们需要保存模型以便和 RunInference 转换一起使用。RunInference 接受 TensorFlow 保存的模型 pb 文件作为其配置的一部分。保存的模型文件必须存储在 RunInference 转换可以访问的位置。在笔记本中,这可以是本地文件系统;但要在 Dataflow 上运行流水线,该文件需要所有工作进程都可以访问,所以这里我们使用 GCP 存储分区。

请注意,tf.keras.models.save_model api 直接支持 gs:// 架构。

tf.keras.models.save_model(model, save_model_dir_multiply)

在开发过程中,能够检查已保存的模型文件的内容非常有用。为此,我们可以使用 TensorFlow 提供的 saved_model_cli。您可以从单元格运行此命令

!saved_model_cli show --dir {save_model_dir_multiply} --all

下面显示了保存的模型文件的简要输出。请注意接受浮点类型张量的签名定义 'serving_default'。我们将在下一部分将其更改为接受另一种类型。

OUTPUT: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['example'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 1)
        name: serving_default_example:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['dense_1'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 1)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predict

RunInference 会将序列化的 tf.example 传递到模型,而不是在当前签名中看到的浮点类型张量。要实现此目的,我们还需要执行一个步骤来准备模型:创建特定签名。

签名是一项强大的功能,它们使我们能够控制调用程序如何与模型交互。来自 TensorFlow 的 文档

"可选签名参数会控制 obj 中哪些函数对使用 SavedModel 的程序可用,例如服务 API。Python 函数可能会用 @tf.function(input_signature=...) 进行装饰,并且直接作为签名传递,或者在使用 @tf.function 进行装饰的方法上调用 get_concrete_function 来实现延迟传递。"

在我们的例子中,下面的代码将创建接受名称为“examples”的tf.string数据类型的签名。然后将此签名保存到模型中,该模型将替换先前保存的模型。

@tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string , name='examples')])
def serve_tf_examples_fn(serialized_tf_examples):
 """Returns the output to be used in the serving signature."""
 features = tf.io.parse_example(serialized_tf_examples, RAW_DATA_PREDICT_SPEC)
 return model(features, training=False)

signature = {'serving_default': serve_tf_examples_fn}

tf.keras.models.save_model(model, save_model_dir_multiply, signatures=signature)

如果您再次运行 saved_model_cli 命令,您将看到输入签名已更改为 DT_STRING

模式 1:预测时的 RunInference

步骤 1 - 在流水线中使用 RunInference

模型准备就绪后,可以将 RunInference 转换插入 Apache Beam 流水线中。下面的流水线使用 TFXIO TFExampleRecord,它通过 RawRecordBeamSource() 将其转换为转换。已保存的模型位置和签名作为 SavedModelSpec 配置对象传递给 RunInference API。

pipeline = beam.Pipeline()

tfexample_beam_record = tfx_bsl.public.tfxio.TFExampleRecord(file_pattern=predict_values_five_times_table)

with pipeline as p:
   _ = (p | tfexample_beam_record.RawRecordBeamSource()
          | RunInference(
              model_spec_pb2.InferenceSpecType(
                  saved_model_spec=model_spec_pb2.SavedModelSpec(model_path=save_model_dir_multiply)))
          | beam.Map(print)
       )

请注意

您可以使用 RunInference 执行两种类型的推理

  • 从 SavedModel 实例进行进程内推理。在 inference_spec_type 中设置 saved_model_spec 字段时使用。
  • 使用服务端点进行远程推理。在 inference_spec_type 中设置 ai_platform_prediction_model_spec 字段时使用。

以下是输出摘要。此处的值难以解释,因为它们采用未处理的原始格式。在下一部分将对原始结果进行后处理。

OUTPUT: 

predict_log {
  request { 
model_spec { signature_name: "serving_default" }
                inputs {
      key: "examples"
... 
       string_val: "\n\022\n\020\n\007example\022\005\032\003\n\001i"
...
response {
    outputs {
      key: "output_0"
      value {
   ...
        float_val: 524.993896484375

模式 2:对 RunInference 结果进行后处理

RunInference API 返回 PredictionLog 对象,其中包含序列化输入和对模型调用的输出。能够访问输入和输出,使您能够在后处理过程中创建一个简单的元组,以供在管道中下游使用。同样值得注意的是,RunInference 会以透明的方式考虑该模型能够批量处理的功能(并出于性能目的执行批量推理)。

PredictionProcessor beam.DoFn 会获取 RunInference 的输出,并生成包含问题和答案的格式化文本作为输出。当然,在生产系统中,根据具体用途,输出通常会是 Tuple[input, output],或者仅仅是输出。

class PredictionProcessor(beam.DoFn):

   def process(
           self,
           element: prediction_log_pb2.PredictionLog):
       predict_log = element.predict_log
       input_value = tf.train.Example.FromString(predict_log.request.inputs['examples'].string_val[0])
       output_value = predict_log.response.outputs
       yield (f"input is [{input_value.features.feature['x'].float_list.value}] output is {output_value['output_0'].float_val}");

pipeline = beam.Pipeline()

tfexample_beam_record = tfx_bsl.public.tfxio.TFExampleRecord(file_pattern=predict_values_five_times_table)

with pipeline as p:
   _ = (p | tfexample_beam_record.RawRecordBeamSource()
          | RunInference(
              model_spec_pb2.InferenceSpecType(
                  saved_model_spec=model_spec_pb2.SavedModelSpec(model_path=save_model_dir_multiply)))
          | beam.ParDo(PredictionProcessor())
          | beam.Map(print)
       )

现在,输出既包含原始输入,又包含模型的输出值。

OUTPUT: 

input is [[105.]] output is [523.6328735351562]
input is [[108.]] output is [538.5157470703125]
input is [[1000.]] output is [4963.6787109375]
input is [[1013.]] output is [5028.1708984375]

模式 3:附加密钥

一种有用的模式是能够将信息(通常是唯一标识符)与输入传递给模型,并能够通过输出访问此标识符。例如,在物联网用例中,您可以将设备 ID 与传递给模型的输入数据相关联。通常,此类密钥对模型本身无用,因此不应传递到第一层。

RunInference 通过接受 Tuple[key, value] 和输出 Tuple[key, PredictLog] 为我们处理此事。

步骤 1 - 创建具有附加密钥的源

由于我们需要一个密钥,其中包含我们发送以进行预测的数据,因此在此步骤中,我们在 BigQuery 中创建一个具有两列的表:一列用于保存密钥,另一列用于保存测试值。

CREATE OR REPLACE TABLE
  maths.maths_problems_1 ( key STRING OPTIONS(description="A unique key for the maths problem"),
    value FLOAT64 OPTIONS(description="Our maths problem" ) );
INSERT INTO
  maths.maths_problems_1
VALUES
  ( "first_question", 105.00),
  ( "second_question", 108.00),
  ( "third_question", 1000.00),
  ( "fourth_question", 1013.00)

步骤 2 - 修改后处理器和管道

在此步骤中,我们会

  • 修改管道,使其从新的 BigQuery 源表读取
  • 添加 map 转换,将表格行转换为 Tuple[bytes, Example]
  • 修改推理后处理器,使其随着密钥一起输出结果
class PredictionWithKeyProcessor(beam.DoFn):

   def __init__(self):
       beam.DoFn.__init__(self)

   def process(
           self,
           element: Tuple[bytes, prediction_log_pb2.PredictionLog]):
       predict_log = element[1].predict_log
       input_value = tf.train.Example.FromString(predict_log.request.inputs['examples'].string_val[0])
       output_value = predict_log.response.outputs
       yield (f"key is {element[0]} input is {input_value.features.feature['x'].float_list.value} output is { output_value['output_0'].float_val[0]}" )

pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'gs://{bucket}/tmp'})
pipeline = beam.Pipeline(options=pipeline_options)

with pipeline as p:
 _ = (p | beam.io.gcp.bigquery.ReadFromBigQuery(table=f'{project}:maths.maths_problems_1')
         | beam.Map(lambda x : (bytes(x['key'], 'utf-8'), ExampleProcessor().create_example(numpy.float32(x['value']))))
         | RunInference(
             model_spec_pb2.InferenceSpecType(
                 saved_model_spec=model_spec_pb2.SavedModelSpec(model_path=save_model_dir_multiply)))
         | beam.ParDo(PredictionWithKeyProcessor())
         | beam.Map(print)
     )
key is b'first_question' input is [105.] output is 524.0875854492188
key is b'second_question' input is [108.] output is 539.0093383789062
key is b'third_question' input is [1000.] output is 4975.75830078125
key is b'fourth_question' input is [1013.] output is 5040.41943359375

模式 4:在同一管道中使用多个模型进行推理

在本系列的第一部分中,“从多个模型中合并结果”模式介绍了 Apache Beam 中的各种分支技术,这些技术使您可以通过多个模型运行数据。

这些技术可适用于 RunInference API,多个分支可以使用该 API 在管道中轻松使用相同的或不同的模型。这与层叠集聚的功能类似,但此处数据通过单个 Apache Beam DAG 流经多个模型。

并行使用多个模型进行推理

在此示例中,相同的数据通过两个不同的模型运行:我们一直用于乘以 5 的模型以及学习乘以 10 的新模型。

Example of data being ran through 2 different models
"""
Create multiply by 10 table.

x is a range of values from 0 to 100.
y is a list of x * 10

value_to_predict includes a values outside of the training data
"""
x = numpy.arange( 0, 1000)
y = x * 10

# Create our labeled example file for 10 times table

example_ten_times_table = 'example_ten_times_table.tfrecord'

with tf.io.TFRecordWriter( example_ten_times_table ) as writer:
 for i in zip(x, y):
   example = ExampleProcessor().create_example_with_label(
       feature=i[0], label=i[1])
   writer.write(example.SerializeToString())

dataset = tf.data.TFRecordDataset(example_ten_times_table) 
dataset = dataset.map(lambda e : tf.io.parse_example(e, RAW_DATA_TRAIN_SPEC)) 
dataset = dataset.map(lambda t : (t['x'], t['y'])) 
dataset = dataset.batch(100)
dataset = dataset.repeat() 

model.fit(dataset, epochs=500, steps_per_epoch=10, verbose=0)

tf.keras.models.save_model(model,
                           save_model_dir_multiply_ten,
                           signatures=signature)

我们拥有两个模型后,将它们应用到我们的源数据中。

pipeline_options = PipelineOptions().from_dictionary(
                                     {'temp_location':f'gs://{bucket}/tmp'})

pipeline = beam.Pipeline(options=pipeline_options)

with pipeline as p:
 questions = p | beam.io.gcp.bigquery.ReadFromBigQuery(
                                   table=f'{project}:maths.maths_problems_1')

 multiply_five = ( questions
             | "CreateMultiplyFiveTuple" >>
             beam.Map(lambda x : (bytes('{}{}'.format(x['key'],' * 5'),'utf-8'),
                                   ExampleProcessor().create_example(x['value'])))
            
             | "Multiply Five" >> RunInference(
                 model_spec_pb2.InferenceSpecType(
                 saved_model_spec=model_spec_pb2.SavedModelSpec(
                                           model_path=save_model_dir_multiply)))
     )
 multiply_ten = ( questions
         | "CreateMultiplyTenTuple" >>
         beam.Map(lambda x : (bytes('{}{}'.format(x['key'],'* 10'), 'utf-8'),
                              ExampleProcessor().create_example(x['value'])))
         | "Multiply Ten" >> RunInference(
             model_spec_pb2.InferenceSpecType(
             saved_model_spec=model_spec_pb2.SavedModelSpec(
                                         model_path=save_model_dir_multiply_ten)))
 )
 _ = ((multiply_five, multiply_ten) | beam.Flatten()
                                    | beam.ParDo(PredictionWithKeyProcessor())
                                    | beam.Map(print))
Output:

key is b'first_question * 5' input is [105.] output is 524.0875854492188
key is b'second_question * 5' input is [108.] output is 539.0093383789062
key is b'third_question * 5' input is [1000.] output is 4975.75830078125
key is b'fourth_question * 5' input is [1013.] output is 5040.41943359375
key is b'first_question* 10' input is [105.] output is 1054.333984375
key is b'second_question* 10' input is [108.] output is 1084.3131103515625
key is b'third_question* 10' input is [1000.] output is 9998.0908203125
key is b'fourth_question* 10' input is [1013.] output is 10128.0009765625

顺序中使用多个模型进行推理

在顺序模式下,数据会顺序发送到一个或多个模型,每个模型的输出链到下一个模型。

sequential pattern sending data to one or more models in sequence, with the output from each model chaining to the next model.

具体步骤如下

  1. 从 BigQuery 中读取数据
  2. 映射数据
  3. 使用乘以 5 的模型运行推理
  4. 处理结果
  5. 使用乘以 10 的模型运行推理
  6. 处理结果
pipeline_options = PipelineOptions().from_dictionary(
                                       {'temp_location':f'gs://{bucket}/tmp'})

pipeline = beam.Pipeline(options=pipeline_options)

def process_interim_inference(element : Tuple[
                                        bytes, prediction_log_pb2.PredictionLog
                                        ])-> Tuple[bytes, tf.train.Example]:
  
  key = '{} original input is {}'.format(
             element[0], str(tf.train.Example.FromString(
                 element[1].predict_log.request.inputs['examples'].string_val[0]
                 ).features.feature['x'].float_list.value[0]))
  
  value = ExampleProcessor().create_example(
              element[1].predict_log.response.outputs['output_0'].float_val[0])
  
  return (bytes(key,'utf-8'),value)

with pipeline as p:
  
 questions = p | beam.io.gcp.bigquery.ReadFromBigQuery(
                                   table=f'{project}:maths.maths_problems_1')

 multiply = ( questions
             | "CreateMultiplyTuple" >>
             beam.Map(lambda x : (bytes(x['key'],'utf-8'),
                                   ExampleProcessor().create_example(x['value'])))
             | "MultiplyFive" >> RunInference(
                 model_spec_pb2.InferenceSpecType(
                 saved_model_spec=model_spec_pb2.SavedModelSpec(
                                   model_path=save_model_dir_multiply)))
            
     )

 _ = ( multiply
         | "Extract result " >> 
         beam.Map(lambda x : process_interim_inference(x))
         | "MultiplyTen" >> RunInference(
             model_spec_pb2.InferenceSpecType(
             saved_model_spec=model_spec_pb2.SavedModelSpec(
                             model_path=save_model_dir_multiply_ten)))
         | beam.ParDo(PredictionWithKeyProcessor())
         | beam.Map(print)
 )
Output: 

key is b"b'first_question' original input is 105.0" input is [524.9771118164062] output is 5249.7822265625
key is b"b'second_question' original input is 108.0" input is [539.9765014648438] output is 5399.7763671875
key is b"b'third_question' original input is 1000.0" input is [4999.7841796875] output is 49997.9453125
key is b"b'forth_question' original input is 1013.0" input is [5064.78125] output is 50647.91796875

在 Dataflow 上运行管道

迄今为止,管道在本地运行,使用的是直接运行器,在使用默认配置运行管道时会隐式使用该运行器。可以通过传入包括 --runner. 在内的配置参数使用生产用 Dataflow 运行器来运行相同的示例。详细信息和示例可在此找到。

以下是在 Dataflow 服务中运行的多模型管道图的示例

example of the multimodel pipeline graph running on the Dataflow service

使用 Dataflow 运行器,您还可以访问管道监控以及 RunInference 转换中输出的指标。下表显示了其中的部分指标,您还可以从该库中获取更全面的列表。

Table showing Dataflow runner metrics

结论

在此系列博文第 II 部分中,我们探讨了在一些常见场景中使用 tfx-bsl RunInference 的情况,从标准推理到后期处理,再到在管道中的多个位置使用 RunInference API。

要了解更多信息,请参阅DataflowTFX文档,还可以尝试使用TFX 配合 Google Cloud AI 平台管道

鸣谢

如果没有 Dataflow TFX 和 TF 团队许多人的辛勤工作,所有这些都无从谈起。从 TFX 和 TF 团队尤其感谢 Konstantinos Katsiapis、Zohar Yahav、Vilobh Meshram、Jiayi Zhao、Zhitao Li 和 Robert Crowe。从 Dataflow 团队感谢 Ahmet Altay 一直以来的支持和投入。

下一篇帖子
Using TFX inference with Dataflow for large scale ML inference patterns

,高级员工开发倡导者 Reza Rokni 发布 在本博客系列的第 I 部分 中,我们针对在 Google Cloud Dataflow 中对机器学习模型进行有效推断进行部署的最佳实践和模式进行了讨论。除其他技术之外,还展示了输入的有效批处理以及 shared.py 的使用,以便有效利用模型。在本文中,我们逐步讲解了 RunInferen… 的使用方式