Google Article
TensorFlow 与 Apache Arrow 数据集
2019 年 8 月 23 日
作者:Bryan Cutler

Apache Arrow 为 TensorFlow 提供了一种高性能数据交换方式,这种方式既标准化,又针对分析和机器学习进行了优化。来自 TensorFlow I/O 的 Arrow 数据集提供了一种将 Arrow 数据直接引入 TensorFlow tf.data 的方法,这将与现有的输入管道和 tf.data.Dataset API 协同工作。

这篇博客将介绍不同的 Arrow 数据集,以及如何使用它们来馈送常见的 TensorFlow 工作负载。从 Pandas 作为示例输入源开始,将介绍每种 Arrow 数据集类型以展示其用法并描述最佳实践。然后将逐步完成一个完整的示例,以演示如何使用 Arrow 来进行 Keras 模型训练,首先在本地进行训练,然后轻松扩展到使用更大的远程数据集。最后一部分将讨论 Arrow 与 TensorFlow 未来将要推出的附加功能,以及更好地集成和改进 TensorFlow I/O 中基于列的处理的持续工作。

Apache Arrow 的核心是一个 标准格式,用于在内存中存储列式数据,它旨在提高系统之间的效率和互操作性。该项目提供了许多工具,使您能够轻松地在自己的应用程序中处理 Arrow 数据,并利用内置优化,无论您是使用一个还是多个 Python、C++、Java、Go 和 Rust 等库实现。

在 TensorFlow 中使用 Apache Arrow 有几个优点。首先,因为它是一个标准,所以无论数据来源是什么,它都能确保类型安全和数据完整性。其次,由于它是一种内存中格式,因此它允许系统之间交换数据,而无需实现序列化或为不同文件格式实现多个转换器。最后,Arrow 从一开始就被设计为在处理数据的各个方面都进行优化,从零拷贝读取到支持现代硬件上的加速操作。因此,您可以确保数据尽可能高效地处理,并且可以无缝地与任何规模的不同系统集成。

注意:本文中的示例使用 tensorflow 1.14.0(处于 Eager 模式)、tensorflow_io 0.8.0(预发布版)pyarrow 0.11.1 以及 sklearn 0.21.2

Arrow 数据集概述

Arrow 数据集是 tf.data.Dataset 的扩展,因此它们利用相同的 API 与 tf.data 管道集成,可以用作 tf.keras 的输入。目前,TensorFlow I/O 提供了 3 种 Arrow 数据集。它们分别是:ArrowDatasetArrowFeatherDatasetArrowStreamDataset。所有这些数据集都由相同的基础 Arrow 数据提供支持,该数据有两个重要的方面:数据是结构化的和批处理的。

结构化的 Arrow 数据

如前所述,Arrow 定义了一种 列式数据格式,并且使用架构来描述每列,包括名称、数据类型、位宽等。这意味着传入的 Arrow 数据是自描述的,并且可以自动推断数据类型和形状,从而确保 TensorFlow 无论数据来源是什么都能使用精确的类型规范。

Arrow 数据的格式与语言无关,旨在能够跨语言边界(例如,从 Java 到 C++)传输数据,而无需数据序列化或中间处理。

目前,TensorFlow I/O Arrow 数据集中仅支持原始数据类型,可以是标量或数组值。后者将转换为密集张量向量。

本机批处理 Arrow

当 Arrow 数据被分成由一组具有相同行数的列组成的记录批次时,它可以最有效地使用。然后可以通过 流或文件格式 交换这些批次。每个 Arrow 数据集都支持 batch_size 选项,该选项具有一个可选的 batch_mode,它可以是“keep_remainder”、“drop_remainder”或“auto”。“keep_remainder”和“drop_remainder”模式控制当数据集的末尾导致一个少于 batch_size 的部分批次时会发生什么。当使用“auto”模式时,批次大小会自动设置为传入的 Arrow 记录批次的大小,并且不需要设置 batch_size 选项。

在此处设置 batch_size(或使用“auto”模式)比使用 tf.data.Dataset.batch() 更有效,因为 Arrow 可以本机创建数据批次并使用它们来高效地将批次数据转换为张量。

创建示例 DataFrame 输入

在查看数据集之间的差异之前,让我们先创建一个示例 Pandas DataFrame,可以用作输入源
import numpy as np
import pandas as pd

data = {'label': np.random.binomial(1, 0.5, 10)}
data['x0'] = np.random.randn(10) + 5 * data['label']
data['x1'] = np.random.randn(10) + 5 * data['label']

df = pd.DataFrame(data)

print(df.head())
#   label        x0        x1
#0      1  5.241089  6.231621
#1      0  0.527365  0.966182
PyArrow 与 Pandas 的集成非常好,并且具有许多内置功能,可以高效地相互转换。Arrow 数据集在内部使用这些转换,下面的模型训练示例将展示如何做到这一点。

从 Arrow 内存创建数据集

ArrowDataset 使用已加载到内存中的 Arrow 数据。由于所有数据都必须驻留在内存中,因此仅建议对小型数据集使用此方法,并且在内存约束允许的情况下,它对于快速加载数据最有用。在 TensorFlow 图模式下,数据需要在操作中序列化,这会导致内存使用量激增。但是,在 TensorFlow Eager 模式下,并在本地进程中运行时,数据会自动从 Python 零拷贝交换到 C++ 内核。让我们使用上面的示例 DataFrame 作为输入来试一试
import tensorflow_io.arrow as arrow_io

ds = arrow_io.ArrowDataset.from_pandas(
    df,
    batch_size=2,
    preserve_index=False)

# Make an iterator to the dataset
ds_iter = iter(ds)

# Print the first batch
print(next(ds_iter))
#(,
# )
数据集构造函数 from_pandas 将 Pandas DataFrame 作为第一个参数,batch_size 设置为 2,并且通过将 preserve_index 设置为 False 来省略 DataFrame 索引列。数据集的输出类型和形状可以从 Pandas DataFrame 架构中自动推断。

显示了第一批数据的输出。3 列产生了 3 个张量,由于 batch_size 为 2,因此每个张量的输出形状为 (2,)。

加载 Arrow Feather 文件

ArrowFeatherDataset 可以加载一组 Arrow Feather 格式的文件。Feather 是一种轻量级文件格式,它提供了一种简单而高效的方式将 Pandas DataFrames 写入磁盘,有关更多信息,请参阅 Arrow Feather 格式文档。目前,它仅限于原始标量数据,但在 Arrow 1.0.0 发布后,计划全面支持 Arrow 数据,以及与 R DataFrames 的互操作性。

如果您的工作负载处理许多 DataFrames 并且希望将数据写入磁盘,那么此数据集将是理想的选择。Arrow Feather 读取器/写入器旨在最大程度地提高加载/保存 Arrow 记录批次的性能。但是,如果您的文件旨在用于长期存储,那么其他列式格式(如 Apache Parquet)可能更适合。使用上面的示例 DataFrame,以下代码将把它保存为 Feather 文件,然后使用文件名列表创建一个数据集。
import tensorflow_io.arrow as arrow_io
from pyarrow.feather import write_feather

# Write the Pandas DataFrame to a Feather file
write_feather(df, '/path/to/df.feather')

# Create the dataset with one or more filenames
ds = arrow_io.ArrowFeatherDataset(
    ['/path/to/df.feather'],
    columns=(0, 1, 2),
    output_types=(tf.int64, tf.float64, tf.float64),
    output_shapes=([], [], []))

# Iterate over each row of each file
for record in ds:
   label, x0, x1 = record
   # use label and feature tensors
数据集构造函数的第一个参数是字符串或字符串列表,其中包含要读取的每个文件名。下一个参数 columns 允许通过索引选择某些列。最后,给出了 output_typesoutput_shapes。或者,可以使用 Arrow 架构以及备用构造函数 ArrowFeatherDataset.from_schema,它会自动推断张量的类型或形状。

读取 Arrow 批次流

ArrowStreamDataset 用于连接到一个或多个端点,这些端点以 Arrow 流格式提供 Arrow 记录批次。有关流格式的更多信息,请参阅 the_Arrow_stream_docs。流式传输批次是一种出色的方式来迭代可能不完全驻留在内存中的大型数据集(本地或远程)。在流式传输过程中,可以使用批次大小来限制内存使用量。目前支持的端点包括使用端点 :tcp://: 的 POSIX IPv4 套接字、使用端点 unix:// 的 Unix 域套接字以及使用端点 fd://0fd://- 的 STDIN。

上面的示例 Pandas DataFrame 也可以用作数据集的输入。这将在内部对 DataFrame 进行零拷贝切片,切片大小为 batch_size,将切片转换为 Arrow 记录批次,并在本地套接字上作为流提供。
import tensorflow_io.arrow as arrow_io

ds = arrow_io.ArrowStreamDataset.from_pandas(
    df,
    batch_size=2,
    preserve_index=False)
构造函数与上面的 ArrowDataset 示例几乎相同,但由于它被分成批次并作为流提供,因此内存使用量要少得多,这允许在内存中处理非常大的 DataFrames。构造函数还将接受 DataFrames 的序列或迭代器,只要架构相同即可。

模型训练 Arrow

也许展示 Arrow 的最佳方式是通过一个示例。在本节中,将使用 Arrow 来读取 CSV 数据并从中训练一个简单的分类模型。首先,将使用一个小数据样本在本地测试模型,直到结果令人满意。然后,通过一些补充,可以更改数据集以从更大的远程文件集中读取数据,并将数据处理从运行模型训练的机器卸载。

在本地训练模型

让我们首先定义模型和训练步骤。数据将采用上面示例 Pandas DataFrame 的形式,包含一个标签和 2 个特征列。为了构建分类器,我们将使用 Keras 创建一个简单的逻辑回归模型
def model_fit(ds):
  """Create and fit a Keras logistic regression model."""
  
  # Build the Keras model
  model = tf.keras.Sequential()
  model.add(tf.keras.layers.Dense(1, input_shape=(2,),
            activation='sigmoid'))
  model.compile(optimizer='sgd', loss='mean_squared_error',
                metrics=['accuracy'])

  # Fit the model on the given dataset
  model.fit(ds, epochs=5, shuffle=False)
  return model
现在让我们创建一个函数来将 CSV 文件读取到 Arrow 数据中,并处理批次以执行特征转换。这将使用 PyArrow CSV 读取器,该读取器经过高度优化,可以将数据读取到 Arrow 记录批次中。
def read_and_process(filename):
  """Read the given CSV file and yield processed Arrow batches."""

  # Read a CSV file into an Arrow Table with threading enabled and
  # set block_size in bytes to break the file into chunks for granularity,
  # which determines the number of batches in the resulting pyarrow.Table
  opts = pyarrow.csv.ReadOptions(use_threads=True, block_size=4096)
  table = pyarrow.csv.read_csv(filename, opts)

  # Fit the feature transform
  df = table.to_pandas()
  scaler = StandardScaler().fit(df[['x0', 'x1']])

  # Iterate over batches in the pyarrow.Table and apply processing
  for batch in table.to_batches():
    df = batch.to_pandas()

    # Process the batch and apply feature transform
    X_scaled = scaler.transform(df[['x0', 'x1']])
    df_scaled = pd.DataFrame({'label': df['label'], 
                              'x0': X_scaled[:, 0],
                              'x1': X_scaled[:, 1]})
    batch_scaled = pa.RecordBatch.from_pandas(df_scaled, preserve_index=False)
    
    yield batch_scaled
在 pyarrow CSV 读取器中设置 block_size 选项,将文件分成块以控制线程粒度并生成多个记录批次。然后迭代这些批次以执行特征转换,方法是首先将特征转换为 Pandas DataFrame,处理数据,然后将结果转换回 Arrow 记录批次。这些转换步骤中的每一个都在批次上非常高效地执行,因此从 Pandas 来回转换的成本很低。最后,我们不是一次返回所有批次,而是使用处理后的批次调用 yield,这将返回一个生成器,该生成器允许它用作输入流的迭代器。

下一步是构建 Arrow 数据集。我们将使用 ArrowStreamDataset,它的构造函数接受一个记录批次迭代器作为输入,并可以使用之前函数 read_and_process() 返回的生成器。
def make_local_dataset(filename):
  """Make a TensorFlow Arrow Dataset that reads from a local CSV file."""
 
  # Read the local file and get a record batch iterator
  batch_iter = read_and_process(filename)
  
  # Create the Arrow Dataset as a stream from local iterator of record batches
  ds = arrow_io.ArrowStreamDataset.from_record_batches(
    batch_iter,
    output_types=(tf.int64, tf.float64, tf.float64),
    batch_mode='auto',
    record_batch_iter_factory=partial(read_and_process, filename))

  # Map the dataset to combine feature columns to single tensor
  ds = ds.map(lambda l, x0, x1: (tf.stack([x0, x1], axis=1), l))
  return ds
构造函数 ArrowStreamDataset.from_record_batches 接受记录批次迭代器、output_type 定义和 batch_mode。使用的批处理模式为“auto”,它将自动创建批次大小等于传入 Arrow 记录批次大小的张量批次。这很有用,因为输入可以控制原始记录批次大小,但也可以指定不同的 batch_size

参数 record_batch_iter_factory 用于指定一个函数,该函数将初始化记录批次迭代器,以便在训练 epochs 期间多次使用它。最后一行添加了对 tf.data.Dataset.map() 的调用,该调用会将特征列堆叠成单个张量输出。

现在,模型可以通过运行上述两个函数在本地文件上进行训练。
ds = make_local_dataset(filename)
model = model_fit(ds)

print("Fit model with weights: {}".format(model.get_weights()))
# Fit model with weights:
# [array([[0.7793554 ], [0.61216295]], dtype=float32),
#  array([0.03328196], dtype=float32)]

扩展到远程数据集

利用已经完成的本地操作,进行一些小的调整,就可以从更大的远程数据集进行读取。这还允许将处理转移到服务器端,这样执行训练的机器就不需要加载数据和进行特征转换。相反,它可以更好地利用所有资源专注于训练。

首先,让我们将单个文件扩展到遍历 CSV 文件目录。这可以通过几行额外的 Python 代码完成。
def read_and_process_dir(directory):
  """Read a directory of CSV files and yield processed Arrow batches."""

  for f in os.listdir(directory):
    if f.endswith(".csv"):
      filename = os.path.join(directory, f)
      for batch in read_and_process(filename):
        yield batch
现在编写我们的服务函数,它将在 TCP 套接字上监听,读取每个文件,处理,然后将每个批次流式传输到客户端。
def serve_csv_data(ip_addr, port_num, directory):
  """
  Create a socket and serve Arrow record batches as a stream read from the
  given directory containing CVS files.
  """

  # Create the socket
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.bind((ip_addr, port_num))
  sock.listen(1)

  # Serve forever, each client will get one iteration over data
  while True:
    conn, _ = sock.accept()
    outfile = conn.makefile(mode='wb')
    writer = None
    try:

      # Read directory and iterate over each batch in each file
      batch_iter = read_and_process_dir(directory)
      for batch in batch_iter:

        # Initialize the pyarrow writer on first batch
        if writer is None:
          writer = pa.RecordBatchStreamWriter(outfile, batch.schema)

        # Write the batch to the client stream
        writer.write_batch(batch)

    # Cleanup client connection
    finally:
      if writer is not None:
        writer.close()
      outfile.close()
      conn.close()
  sock.close()
其中大部分是用于设置套接字服务器的样板代码。重要的是要展示正在使用 PyArrow RecordBatchStreamWriter,它将一系列 Arrow 记录批次写入输出流(在本例中为 TCP 套接字)。

为了构建 Arrow 数据集,将再次使用 ArrowStreamDataset,但构造函数将传递我们远程服务器的端点,而不是 from_record_batches。在内部,数据集内核将创建一个客户端连接,并开始通过套接字读取记录批次。
def make_remote_dataset(endpoint):
  """Make a TensorFlow Arrow Dataset that reads from a remote Arrow stream."""

  # Create the Arrow Dataset from a remote host serving a stream
  ds = arrow_io.ArrowStreamDataset(
      [endpoint],
      columns=(0, 1, 2),
      output_types=(tf.int64, tf.float64, tf.float64),
      batch_mode='auto')

  # Map the dataset to combine feature columns to single tensor
  ds = ds.map(lambda l, x0, x1: (tf.stack([x0, x1], axis=1), l))
  return ds
这里使用了一个单一的端点,但它也可以是包含多个端点的列表。其余部分与我们的本地数据集相同,除了不需要指定 record_batch_iter_factory 参数,因为服务器函数在客户端断开连接后会重复数据集,然后再次重新连接。

为了运行训练,可以在一个或多个远程进程中执行服务器函数 serve_csv_data(),并在一个单独的进程中执行 model_fit()。一个完整的示例,展示了本地和远程训练,可以在 gist arrow_model_training_example 中找到。

这个例子中一个重要的收获是,由于使用了 Arrow 作为数据格式,数据从 Python 服务器直接传输到数据集内核中的 C++ 客户端,而无需使用专有格式或保存到中间文件。如果 Python 服务器不再满足你的需求,可以将其更改为 Java 服务器,而无需修改模型训练端的任何内容。同样,如果你的数据需求超过了 CSV 格式,只需要修改服务器端以支持不同的数据源即可。

Arrow 和 TensorFlow I/O 的未来工作

在撰写本文时,Arrow 项目正在进行 1.0 版本的发布。这很重要,因为它将在整个 1.x 生命周期中提供兼容性保证。除此之外,在 Arrow 记录批次中添加对张量作为逻辑类型的支持,将允许将矩阵和高阶数据与标准列式类型混合,并更好地与许多 ML 工作流集成。跟踪问题 ARROW-1614ARROW-5819 以获取更新,或关注项目博客 https://arrow.apache.ac.cn/blog/

Arrow 与 TensorFlow I/O 的即将推出的改进包括添加一个 Arrow Flight 数据集,它将提供一个客户端,可以与 Arrow Flight 服务器连接,使用 Arrow 本地 RPC 框架传输数据。这将使 TensorFlow 连接网络上的其他分布式应用程序以交换 Arrow 数据变得轻而易举。目前,Arrow Flight 正在被强化为生产就绪,到 Arrow 1.0 版本应该会变得稳定。在 tensorflow/io/issues/398 上跟踪问题以获取更新。

此外,TensorFlow I/O 正在努力扩展 Arrow 和相关数据集(如 Apache Parquet、HDF5 和 JSON)的列式操作。这将使诸如分割、合并、选择列以及对不同列式数据集混合进行其他操作等成为可能。在 tensorflow/io/issues/315 上查看更多信息。

感谢 Maureen McElaney。

下一篇文章
Article Image Placeholder

Bryan Cutler 发表

Apache Arrow 为 TensorFlow 提供了一种高性能数据交换方式,它既标准化又针对分析和机器学习进行了优化。来自 TensorFlow I/O 的 Arrow 数据集提供了一种将 Arrow 数据直接导入 TensorFlow tf.data 的方式,它将与现有输入管道和 tf.data.Dataset API 协同工作。

本博客将介绍不同的 Arrow d…