创建自定义 TFX 执行器
2019 年 9 月 19 日
发布者 Kevin Haas李志涛, 和 Robert Crowe 代表 TFX 团队

TensorFlow Extended (TFX) 是一个用于创建生产就绪的 ML 管道的平台。TFX 由 Google 创建,为 Google 的 ML 服务和应用提供基础,我们已经为需要创建生产 ML 管道的每个人开放了 TFX 的源代码。

A diagram of a custom executor
TFX 可以通过多种方式扩展和定制,包括开发新的组件并将它们包含到您的管道中。TFX 管道是一系列 TFX 组件,每个组件执行一个单独的任务,这些任务按有向无环图 (DAG) 的顺序排列。在本篇文章中,我们将介绍一个示例来说明开发新的 TFX 组件的过程。请继续关注更多文章,我们将讨论扩展和定制 TFX 的其他方法。

场景

我们希望用一个包含 执行器 的新组件替换现有 TFX 训练器组件 的行为,该执行器会在 Google Cloud Platform (GCP) 上提交作业以运行相同的训练。由于上游和下游语义保持不变,我们将重用现有的训练器组件,并替换执行器的行为。

组件的结构

A diagram of a custom executor
TFX 组件包含三个主要部分
  • 驱动程序
  • 执行器
  • 发布者

驱动程序和发布者

驱动程序通过查询 ML Metadata (MLMD) 存储库为执行器提供元数据,发布者接收执行器的结果并更新元数据存储库。作为开发人员,您通常不需要直接与驱动程序和发布者交互,但驱动程序和发布者记录的消息可能在 调试过程中很有用

执行器

执行器是组件执行其处理的地方。作为开发人员,您将根据实现您正在使用的组件类型的类的要求,编写在执行器中运行的代码。例如,当您使用 Transform 组件时,您将需要开发一个 preprocessing_fn。执行器使用和创建工件,这些工件存储在元数据存储库中。

添加自定义执行器

创建自定义执行器

要创建自定义执行器,我们首先复制当前训练器 执行器 ,然后对我们的 自定义执行器 进行修改以在 Google Cloud AI Platform 上启动训练作业。执行器的大部分基本结构将保持不变,因为输入、输出和执行参数将保持相同。变化将体现在如何处理输入和生成输出。这是通过创建一个新的 Executor 类来实现的,该类扩展了 tfx.components.base.base_executor.BaseExecutor 并实现了 Do()
class Executor(base_executor.BaseExecutor):
  """Start a trainer job on Google Cloud AI Platform."""

  def Do(self, input_dict,
         output_dict,
         exec_properties):
    """Starts a trainer job on Google Cloud AI Platform.

不要忘记在继续下一步之前对其进行测试!我们创建了一个方便的 脚本 供您在将执行器投入生产之前尝试。您应该编写类似的代码来测试您的代码的单元测试。与任何生产软件部署一样,在为 TFX 开发时,您应该确保有良好的测试覆盖率和强大的 CI/CD 框架。

覆盖训练器组件使用的执行器

为了做到这一点,我们将用新的 自定义执行器 替换 TFX 使用的 默认训练器执行器 ,该执行器将在 Google Cloud AI Platform 上创建训练作业。这是通过可选的 executor_class 组件参数来实现的。
from tfx.extensions.google_cloud_ai_platform.trainer
import executor as ai_platform_trainer_executor
...
trainer = Trainer(
    ...,
   custom_executor_spec = executor_spec.ExecutorClassSpec(
         ai_platform_trainer_executor.Executor)
)
就是这样!现在,当训练器组件被工作流引擎调用时,它将运行自定义执行器而不是默认执行器,同时创建和使用与默认执行器相同的 ML Metadata 工件。

将组件参数传递给您的训练器

TFX 执行器是自包含的二进制文件,专注于运行 ML 管道的单个步骤。自定义执行器需要与所有其他 TFX 执行器相同的三个参数:input_dict、output_dict、exec_properties。有关这些参数语义的更多详细信息,请参见BaseExecutor类。
在处理流经 TFX 管道的数据时,通常您通常希望从input_dict中的工件 URI 读取输入数据,而且通常您可能希望将输出写入output_dict中的工件 URI。这可能包括读写多个拆分,例如在使用训练和评估拆分进行处理的情况下。
from tfx.types import artifact_utils

train_input_examples_uri = artifact_utils.get_split_uri(
  input_dict['my_input_data'], 'train')
eval_input_examples_uri = artifact_utils.get_split_uri(
  input_dict['my_input_data'], 'eval')

train_output_examples_uri = artifact_utils.get_split_uri(
  output_dict['my_output_data'], 'train')
eval_output_examples_uri = artifact_utils.get_split_uri(
  output_dict[‘my_output_data'], 'eval')
在上面的示例中,字典键my_input_datamy_output_data是在您要覆盖其执行器的组件的 ComponentSpec 中定义的。
class MyComponentSpec(tfx.types.ComponentSpec):
  PARAMETERS = {
      <...>
  }
  INPUTS = {
      'my_input_data':   
      ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      'my_output_data':
      ChannelParameter(type=standard_artifacts.Examples),                                                            
  }
拆分是在您要覆盖其执行器的组件的输出通道中定义的,通常是在构造函数中
output_data = tfx.types.Channel(
  type=standard_artifacts.Examples,
    artifacts=[
      standard_artifacts.Examples(split=split)
      for split in artifact.DEFAULT_EXAMPLE_SPLITS
    ])
spec = LabelingComponentSpec(
  input_data=input_data,
  my_output_data=output_data)
使用custom_config dict将其他参数传递给您的自定义训练器执行器。自定义执行器可以使用exec_properties.get(‘custom_config’).get(‘your_config_key’)检索这些参数。在下面的示例中,提交Google Cloud AI Platform 训练作业所需的所有其他参数都可以在_ai_platform_training_args中找到。
_ai_platform_training_args = {
   'pythonModule': None,  # Will be populated by TFX
   'args': None,  # Will be populated by TFX
   'region': _gcp_region,
   'jobDir': os.path.join(_output_bucket, 'tmp'),
   'project': ‘your GCP project id’
}

...
trainer = Trainer(
    ...,
    custom_config={'ai_platform_training_args':
    _ai_platform_training_args})   
}

将自定义训练器的输出链接到预期的输出工件

将自定义训练器连接起来以发出预期的输出对于下游组件的成功至关重要。对于 Google Cloud AI Platform 自定义训练器,我们将执行器输入参数序列化,以便它们可以作为 GCP 训练作业的一部分进行传输。由于 Google Cloud AI Platform (CAIP) 执行器正在将默认 TFX 执行器重定向到 Google Cloud AI Platform 上运行,因此两者都采用相同的 {经过转换的示例、transform_fn以及schema} 输入参数来创建 TF 模型。此示例中使用的自定义执行器提交了 CAIP 训练作业,该作业将调用(通过run_executor.py)默认 TFX 训练器作为 CAIP python 模块,有效地打开了从本地工作站到 CAIP 上运行 TFX 训练器的通道。
# Configure Google Cloud AI Platform job
training_inputs = exec_properties.get('custom_config',
    {}).pop('ai_platform_training_args')
executor_class_path = '%s.%s' % 
    (tfx_trainer_executor.Executor.__module__,
    tfx_trainer_executor.Executor.__name__)

# Start Google Cloud AI Platform job
return runner.start_cmle_training(input_dict, output_dict,
    exec_properties, executor_class_path, training_inputs)

使用您的自定义执行器远程运行管道

到目前为止,我们一直假设您的管道正在本地运行,使用您 $PYTHONPATH 中可用的代码。即将发布的博文将解释如何执行打包在容器中或作为 PyPI 包的自定义执行器。

相关主题

除了训练器,TFX ExampleGen 组件也支持执行器级别的自定义。ExampleGen 提供了一个通用组件和一个基本执行器,它们应用了 ML 最佳实践,例如数据混洗和一致/可配置的划分。
如果现有的 ExampleGen 组件无法满足您的需求,请创建一个新的Apache BeamPTransform 用于处理从输入拆分到 TF 示例的转换,TFX 将完成其余工作。ExampleGen 文档有更多详细信息。

有关更多信息

要详细了解 TFX,请查看TFX 网站,加入TFX 讨论组,并观看我们的TFX YouTube 播放列表,并订阅TensorFlow 频道。
下一篇文章
Creating a Custom TFX Executor

发布者Kevin Haas李志涛Robert Crowe代表 TFX 团队

TensorFlow Extended (TFX) 是一个用于创建生产就绪型 ML 管道的平台。TFX 由 Google 创建,为 Google 的 ML 服务和应用程序提供了基础,我们一直在开源 TFX,以供所有需要创建生产 ML 管道的人使用。
TFX 可以通过多种方式进行扩展和自定义,包括…