TensorFlow Extended (TFX):使用 Apache Beam 进行大规模数据处理
2020 年 3 月 10 日
由 Google Cloud 开发者倡导 Reza Rokni 代表 TFX 和 Dataflow 团队发布

Beam reza
TFX 的核心任务是让模型能够从研究阶段迁移到生产阶段,创建和管理生产管道。许多模型将使用大量数据构建,需要多个主机并行工作,以满足生产管道对处理和服务的需要。

我们将利用从 Apache Beam 继承的能力,即 TFX 的数据处理框架,来看看如何在针对小型数据集开发的 TFX 管道上,为生产数据集进行扩展。

Apache Beam

Apache Beam 的起源可以追溯到 FlumeJava,它是 Google 使用的数据处理框架(在 FlumeJava 论文 (2010) 中讨论)。Google Flume 如今在 Google 内部广泛使用,包括 Google 内部 TFX 使用的数据处理框架。

Google Flume 是 Google Cloud Dataflow 开发的基础(在 2015 年发布)。Dataflow 的 SDK 在 2016 年作为 Apache Beam 开源。与 Google 内部 TFX 实现类似(在 TFX 论文 (2017) 中讨论),TFX 的外部版本使用了 Google Flume 的外部版本,即 Apache Beam。

Apache Beam 的可移植 API 层为 TFX 库提供支持(例如,TensorFlow 数据验证TensorFlow 变换TensorFlow 模型分析),在执行的有向无环图 (DAG) 上下文中。Apache Beam 管道可以在各种执行引擎或“运行器”上执行。可以在以下位置找到运行器及其功能的全面列表:

https://beam.apache.org/documentation/runners/capability-matrix/.

本博客中使用的运行器是 Dataflow,它与 Google Flume 共享大部分代码,并且正在进行进一步的统一。

下面我们可以看到 TFX 组件 ExampleGen 在 Dataflow 运行器上运行时创建的图表。

Apache Beam 的优势

能够选择不同的执行引擎是决定在 TFX 中使用 Apache Beam 的重要因素。可以在本地 DirectRunner 上进行开发,并将生产工作负载运行在生产运行器上。例如,生产 Apache Flink 运行器可以在本地数据中心运行,或者您可以使用 Dataflow 等完全托管的云运行器。
通过使用生产运行器,我们可以利用数万个内核,所有内核并行工作,以执行 TFX 库中完成的计算,而无需更改在管道开发过程中创建的核心代码。

我们将通过两个示例展示这种能力。首先,使用核心 TFX 库 AnalyzeAndTransformDataset,最后通过两个 TFX 组件 ExampleGenStatisticsGen

注意
BigQuery 和 Dataflow 是可收费服务,请确保在运行本博客中的任何示例之前了解成本影响。
https://cloud.google.com/bigquery/pricing
https://cloud.google.com/dataflow/pricing
https://cloud.google.com/storage/pricing/

TFX 库

TFX 管道组件基于 TFX 库构建。例如,TensorFlow 变换,它使用 Apache Beam。我们将使用两个不同的 Apache Beam 运行器来探索这个库。最初,将使用本地开发运行器 DirectRunner。之后,将进行一些细微的代码修改,以使用生产 Dataflow 运行器运行该示例。DirectRunner 是用于开发目的的轻量级运行器。它在本地运行,不需要分布式处理框架。

下面的示例管道取自教程 (预处理数据(初学者)),它提供了一个关于如何使用 TensorFlow 变换 (tf.Transform) 预处理数据的示例。

有关 preprocessing_fn, 的详细信息,请参考教程。目前,我们只需要知道它正在转换传递给该函数的数据点。

注意
本博客文章使用的环境
virtualenv tfx-beam --python=python3
source tfx-beam/bin/activate
pip install tfx
def main():
  with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_dataset, transform_fn = (
        (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn))
  transformed_data, transformed_metadata = transformed_dataset
  print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
  print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))
if __name__ == '__main__':
  main()
注意
Apache Beam 使用 特殊的语法来定义和调用变换。例如,在这一行中
result = pass_this | 'name this step' >> to_this_call
正在调用方法 to_this_call 并传递名为 pass_this 的对象,并且 此操作将在堆栈跟踪中被称为 name this step

上面的示例将隐式地使用本地开发/测试运行器 DirectRunner。要从本地 DirectRunner 切换到 Dataflow,首先我们需要将 beam_impl.Context 包含在 beam.Pipeline 中。这使我们能够传递参数,例如“--runner”。对于快速本地测试,您可以使用 --runner 设置为 DirectRunner 运行下面的示例。
import apache_beam as beam

argv=['--runner=DirectRunner']

def main():
     with beam.Pipeline(argv=argv) as p:
       # Ignore the warnings
       with beam_impl.Context(temp_dir=tempfile.mkdtemp()):  
         input = p | beam.Create(raw_data)  
         transformed_dataset, transform_fn = (  
             (input, raw_data_metadata)
            | beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))
         transformed_dataset[0] |"Print Transformed Dataset" >>  beam.Map(print)
     
if __name__ == '__main__':
  main()
接下来,我们将切换到使用 Dataflow 运行器。由于 Dataflow 是在 Google Cloud 上运行的完全托管的运行器,因此我们需要为管道提供一些环境信息。这包括 Google Cloud 项目以及管道使用的临时文件的存储位置。

注意
您必须设置正确的权限,才能将管道作业提交到 Dataflow 服务。
有关身份验证的更多信息,请访问:https://cloud.google.com/dataflow/docs/concepts/security-and-permissions

# Setup our Environment

## The location of Input / Output between various stages ( TFX Components )
## This will also be the location for the Metadata 

### Can be used when running the pipeline locally
#LOCAL_PIPELINE_ROOT =

### In production you want the input and output to be stored on non-local location
#GOOGLE_CLOUD_STORAGE_PIPELINE_ROOT=

#GOOGLE_CLOUD_PROJECT = 

#GOOGLE_CLOUD_TEMP_LOCATION = 

# Will need setup.py to make this work with Dataflow
#
# import setuptools
#
# setuptools.setup(
#   name='demo',
#   version='0.0',
#   install_requires=['tfx==0.21.1'],
#   packages=setuptools.find_packages(),)

SETUP_FILE = "./setup.py"

argv=['--project={}'.format(GOOGLE_CLOUD_PROJECT),
      '--temp_location={}'.format(GOOGLE_CLOUD_TEMP_LOCATION),
      '--setup_file={}'.format(SETUP_FILE),
      '--runner=DataflowRunner']
def main():
    with beam.Pipeline(argv=argv) as p:
        with beam_impl.Context(temp_dir=GOOGLE_CLOUD_TEMP_LOCATION):
            input = p | beam.Create(raw_data) 
            transformed_data, transformed_metadata = (
                (input, raw_data_metadata)
                | beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))

if __name__ == '__main__':
  main()
为了了解 TFX 隐藏了多少工作,下面是管道处理的图表的可视化表示。我们不得不缩小图像以使它全部显示,因为有很多变换!
visual representation of the graph that the TFX pipeline processed

使用 Beam 的 TFX 组件

接下来,让我们使用一些 TFX 组件,这些组件是由上面讨论的 TFX 库组成的。我们将使用 ExampleGen 来提取数据,以及 StatisticsGen 来生成数据的描述性统计信息。

ExampleGen

ExampleGen TFX 管道组件将数据提取到 TFX 管道中。它使用外部文件/服务来生成示例,这些示例将被其他 TFX 组件读取。它还将数据分成训练集和评估集,或者根据需要分成更多集合,并可以选择性地对数据集进行洗牌。该过程如下所示
  1. 将数据分成训练集和评估集(默认情况下,2/3 训练集 + 1/3 评估集)
  2. 将数据转换成 tf.Example 格式
  3. 将数据复制到 _tfx_root 目录中,以便其他组件可以访问它,以便其他组件可以访问它
BigQueryExampleGen 允许我们直接查询 BigQuery 中的数据。
def createExampleGen(query: Text):
    # Output 2 splits: train:eval=3:1.
    output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(
                                 name='train', hash_buckets=3),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
    return BigQueryExampleGen(query=query, output_config=output)
除了要运行的 SQL 查询之外,BigQueryExampleGen 代码还通过 SplitConfig 对象传递配置信息。

此示例的数据来自位于 BigQuery 公共数据集(Google Cloud 的数据仓库)上的公共芝加哥出租车行程数据集。

bigquery-public-data.chicago_taxi_trips.taxi_trips.

注意:您可以在以下位置找到有关 BigQuery 公共数据集的更多信息:https://cloud.google.com/bigquery/public-data/

下面的查询将以正确的格式提取数据,以便 ExampleGen 处理。
query="""
SELECT
pickup_community_area,
  fare,
  EXTRACT(MONTH FROM trip_start_timestamp)  trip_start_month,
  EXTRACT(HOUR FROM trip_start_timestamp)  trip_start_hour,
  EXTRACT(DAYOFWEEK FROM trip_start_timestamp)  trip_start_day,
  UNIX_Millis(trip_start_timestamp) trip_start_ms_timestamp,
  pickup_latitude,
  pickup_longitude,
  dropoff_latitude,
  dropoff_longitude,
  trip_miles,
  pickup_census_tract,
  dropoff_census_tract,
  payment_type,
  company,
  trip_seconds,
  dropoff_community_area,
  tips
FROM
  `bigquery-public-data.chicago_taxi_trips.taxi_trips`
LIMIT 100
"""
请注意,使用了 LIMIT 100,这将限制输出为 100 条记录,从而使我们能够快速测试代码的正确性。

StatisticsGen

StatisticsGen TFX 管道组件生成训练数据和评估数据的描述性统计信息,这些信息可以被其他管道组件使用。它对上一步 ExampleGen 的结果进行操作。
def createStatisticsGen(bigQueryExampleGen: BigQueryExampleGen):
    # Computes statistics over data for visualization and example validation.
    return StatisticsGen(examples=bigQueryExampleGen.outputs['examples'])
由于 StatisticsGen 需要 ExampleGen 的输出,因此这两个步骤之间存在依赖关系。这种生产者-消费者模式贯穿大多数生产 ML 管道。为了自动化这个管道,我们需要一些东西来协调这些依赖关系。

管道编排

一种解决方案是编写一个简单、轻量级的 python 脚本。但是,调试、故障模式、重试、日志记录等怎么办?

幸运的是,TFX 与两个管道编排引擎的集成已经解决了这个问题 - KubeflowApache Airflow.

除了这两个编排引擎之外,我们还可以再次使用 Apache Beam 作为编排器,因为依赖关系可以建模为 DAG。因此,我们可以使用一个 DAG,其中包含的变换本身也是 DAG。…“我们必须深入”…:-)。

使用哪个引擎取决于您的生产需求和要求,这超出了本博客的范围。目前,我们将通过 TFX 的 BeamDagRunner 使用 Apache Beam 进行编排。这意味着我们以两种不同的角色使用 Beam - 作为处理数据的执行引擎,以及作为对 TFX 任务进行排序的编排器。
# Used for setting up the orchestration 
from tfx.orchestration import pipeline
from tfx.orchestration import metadata
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
以下代码创建了我们的管道对象,准备由 BeamDagRunner 执行。
from typing import Text
from typing import Type

def createTfxPipeline(pipeline_name: Text, pipeline_root: Text, query: Text,
                      beam_pipeline_args) -> pipeline.Pipeline:
    output = example_gen_pb2.Output(
        # Output 2 splits: train:eval=3:1.
        split_config=example_gen_pb2.SplitConfig(splits=[
            example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=3),
            example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
        ]))

    # Brings data into the pipeline or otherwise joins/converts training data.
    example_gen = BigQueryExampleGen(query=query, output_config=output)
    
    # Computes statistics over data for visualization and example validation.
    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

    return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen, statistics_gen
      ],
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          os.path.join(".", 'metadata', pipeline_name,'metadata.db')),
      enable_cache=False,
      additional_pipeline_args=beam_pipeline_args)
要测试该代码,请使用本地 DirectRunner 中的查询“LIMIT 100”。
tfx_pipeline = createTfxPipeline(
    pipeline_name="my_first_directRunner_pipeline",
    pipeline_root=LOCAL_PIPELINE_ROOT,
    query=query,
    beam_pipeline_args=                               {
        'beam_pipeline_args':[
            '--project={}'.format(GOOGLE_CLOUD_PROJECT),
            '--runner=DirectRunner']})
BeamDagRunner().run(tfx_pipeline)
您可以看到使用 tfdv 生成的结果,并将其输出到 LOCAL_PIPELINE_ROOT 中;
import os
import tensorflow_data_validation as tfdv

stats = tfdv.load_statistics(os.path.join(LOCAL_PIPELINE_ROOT,"StatisticsGen","statistics","","train","stats_tfrecord"))
tfdv.visualize_statistics(stats)
这对于一百条记录来说效果很好,但是如果目标是处理数据集中所有 187,002,0025 行怎么办?为此,我们将管道从 DirectRunner 切换到生产 Dataflow 运行器。还设置了一些额外的环境参数,例如运行管道的 Google Cloud 项目。
tfx_pipeline = createTfxPipeline(
    pipeline_name="my_first_dataflowRunner_pipeline",
    pipeline_root=GOOGLE_CLOUD_STORAGE_PIPELINE_ROOT,
    query=query,
    beam_pipeline_args={
    'beam_pipeline_args':[
        '--project={}'.format(GOOGLE_CLOUD_PROJECT)
,
    '--temp_location={}'.format(GOOGLE_CLOUD_TEMP_LOCATION),
    '--setup_file=./setup.py',
    '--runner=DataflowRunner']})
BeamDagRunner().run(tfx_pipeline)
BeamDagRunner 负责将 ExampleGenStatisticsGen 提交为独立的管道,并确保 ExampleGen 首先成功完成,然后再开始 StatisticsGen。 Dataflow 服务会自动负责启动工作器、自动伸缩、在工作器发生故障时重试、集中式日志记录以及监控。 自动伸缩基于各种信号,包括吞吐率,如下所示; Dataflow 监控控制台会显示有关管道的各种指标,例如工作器的 CPU 使用率。 下面我们看到机器的使用率随着它们上线而不断提高,大多数工作器的使用率始终保持在 90% 以上: Apache Beam 支持自定义计数器,这允许开发人员在他们的管道中创建指标。 TFX 团队利用这一点为各种组件创建了有用的信息计数器。 下面我们可以看到 StatisticsGen 运行期间记录的一些计数器。 过滤关键字“num_*_feature”,大约有十亿个整数和浮点数特征值。

总结

在这篇博文中,我们展示了 TFX 如何利用 Apache Beam 使您能够从开发环境切换到生产基础设施,而无需更改核心代码。 我们从 TFX 库开始,然后转向一个包含两个核心 TFX 管道组件 ExampleGen 和 StatisticsGen 的管道。

了解更多信息

要了解更多关于 TFX 的信息,请查看 TFX 网站,加入 TFX 讨论组,阅读 TFX 博客,观看我们的 TFX YouTube 播放列表 以及 订阅 TensorFlow 频道。
下一篇文章
TensorFlow Extended (TFX): Using Apache Beam for large scale data processing

作者:Reza Rokni,Google Cloud 开发者布道师,代表 TFX 和 Dataflow 团队


TFX 的核心使命是允许将模型从研究转移到生产,创建和管理生产管道。 许多模型将使用大量数据构建,需要多个主机并行工作以满足生产管道的处理和服务需求。

Us…