2019 年 8 月 23 日 - 作者:Bryan Cutler
Apache Arrow 为 TensorFlow 提供了一种高性能数据交换方式,这种方式既标准化,又针对分析和机器学习进行了优化。来自 TensorFlow I/O 的 Arrow 数据集提供了一种将 Arrow 数据直接引入 TensorFlow tf.data
的方法,这将与现有的输入管道和 tf.data.Dataset
API 协同工作。
这篇博客将介绍不同的 Arrow 数据集......
tf.data
的方法,这将与现有的输入管道和 tf.data.Dataset
API 协同工作。tf.data.Dataset
的扩展,因此它们利用相同的 API 与 tf.data
管道集成,可以用作 tf.keras
的输入。目前,TensorFlow I/O 提供了 3 种 Arrow 数据集。它们分别是:ArrowDataset
、ArrowFeatherDataset
和 ArrowStreamDataset
。所有这些数据集都由相同的基础 Arrow 数据提供支持,该数据有两个重要的方面:数据是结构化的和批处理的。batch_size
选项,该选项具有一个可选的 batch_mode
,它可以是“keep_remainder”、“drop_remainder”或“auto”。“keep_remainder”和“drop_remainder”模式控制当数据集的末尾导致一个少于 batch_size
的部分批次时会发生什么。当使用“auto”模式时,批次大小会自动设置为传入的 Arrow 记录批次的大小,并且不需要设置 batch_size
选项。batch_size
(或使用“auto”模式)比使用 tf.data.Dataset.batch()
更有效,因为 Arrow 可以本机创建数据批次并使用它们来高效地将批次数据转换为张量。import numpy as np
import pandas as pd
data = {'label': np.random.binomial(1, 0.5, 10)}
data['x0'] = np.random.randn(10) + 5 * data['label']
data['x1'] = np.random.randn(10) + 5 * data['label']
df = pd.DataFrame(data)
print(df.head())
# label x0 x1
#0 1 5.241089 6.231621
#1 0 0.527365 0.966182
PyArrow 与 Pandas 的集成非常好,并且具有许多内置功能,可以高效地相互转换。Arrow 数据集在内部使用这些转换,下面的模型训练示例将展示如何做到这一点。ArrowDataset
使用已加载到内存中的 Arrow 数据。由于所有数据都必须驻留在内存中,因此仅建议对小型数据集使用此方法,并且在内存约束允许的情况下,它对于快速加载数据最有用。在 TensorFlow 图模式下,数据需要在操作中序列化,这会导致内存使用量激增。但是,在 TensorFlow Eager 模式下,并在本地进程中运行时,数据会自动从 Python 零拷贝交换到 C++ 内核。让我们使用上面的示例 DataFrame 作为输入来试一试import tensorflow_io.arrow as arrow_io
ds = arrow_io.ArrowDataset.from_pandas(
df,
batch_size=2,
preserve_index=False)
# Make an iterator to the dataset
ds_iter = iter(ds)
# Print the first batch
print(next(ds_iter))
#(,
# )
数据集构造函数 from_pandas
将 Pandas DataFrame 作为第一个参数,batch_size
设置为 2,并且通过将 preserve_index
设置为 False 来省略 DataFrame 索引列。数据集的输出类型和形状可以从 Pandas DataFrame 架构中自动推断。batch_size
为 2,因此每个张量的输出形状为 (2,)。ArrowFeatherDataset
可以加载一组 Arrow Feather 格式的文件。Feather 是一种轻量级文件格式,它提供了一种简单而高效的方式将 Pandas DataFrames 写入磁盘,有关更多信息,请参阅 Arrow Feather 格式文档。目前,它仅限于原始标量数据,但在 Arrow 1.0.0 发布后,计划全面支持 Arrow 数据,以及与 R DataFrames 的互操作性。import tensorflow_io.arrow as arrow_io
from pyarrow.feather import write_feather
# Write the Pandas DataFrame to a Feather file
write_feather(df, '/path/to/df.feather')
# Create the dataset with one or more filenames
ds = arrow_io.ArrowFeatherDataset(
['/path/to/df.feather'],
columns=(0, 1, 2),
output_types=(tf.int64, tf.float64, tf.float64),
output_shapes=([], [], []))
# Iterate over each row of each file
for record in ds:
label, x0, x1 = record
# use label and feature tensors
数据集构造函数的第一个参数是字符串或字符串列表,其中包含要读取的每个文件名。下一个参数 columns
允许通过索引选择某些列。最后,给出了 output_types
和 output_shapes
。或者,可以使用 Arrow 架构以及备用构造函数 ArrowFeatherDataset.from_schema
,它会自动推断张量的类型或形状。ArrowStreamDataset
用于连接到一个或多个端点,这些端点以 Arrow 流格式提供 Arrow 记录批次。有关流格式的更多信息,请参阅 the_Arrow_stream_docs。流式传输批次是一种出色的方式来迭代可能不完全驻留在内存中的大型数据集(本地或远程)。在流式传输过程中,可以使用批次大小来限制内存使用量。目前支持的端点包括使用端点 :
或 tcp://:
的 POSIX IPv4 套接字、使用端点 unix://
的 Unix 域套接字以及使用端点 fd://0
或 fd://-
的 STDIN。batch_size
,将切片转换为 Arrow 记录批次,并在本地套接字上作为流提供。import tensorflow_io.arrow as arrow_io
ds = arrow_io.ArrowStreamDataset.from_pandas(
df,
batch_size=2,
preserve_index=False)
构造函数与上面的 ArrowDataset
示例几乎相同,但由于它被分成批次并作为流提供,因此内存使用量要少得多,这允许在内存中处理非常大的 DataFrames。构造函数还将接受 DataFrames 的序列或迭代器,只要架构相同即可。def model_fit(ds):
"""Create and fit a Keras logistic regression model."""
# Build the Keras model
model = tf.keras.Sequential()
model.add(tf.keras.layers.Dense(1, input_shape=(2,),
activation='sigmoid'))
model.compile(optimizer='sgd', loss='mean_squared_error',
metrics=['accuracy'])
# Fit the model on the given dataset
model.fit(ds, epochs=5, shuffle=False)
return model
现在让我们创建一个函数来将 CSV 文件读取到 Arrow 数据中,并处理批次以执行特征转换。这将使用 PyArrow CSV 读取器,该读取器经过高度优化,可以将数据读取到 Arrow 记录批次中。def read_and_process(filename):
"""Read the given CSV file and yield processed Arrow batches."""
# Read a CSV file into an Arrow Table with threading enabled and
# set block_size in bytes to break the file into chunks for granularity,
# which determines the number of batches in the resulting pyarrow.Table
opts = pyarrow.csv.ReadOptions(use_threads=True, block_size=4096)
table = pyarrow.csv.read_csv(filename, opts)
# Fit the feature transform
df = table.to_pandas()
scaler = StandardScaler().fit(df[['x0', 'x1']])
# Iterate over batches in the pyarrow.Table and apply processing
for batch in table.to_batches():
df = batch.to_pandas()
# Process the batch and apply feature transform
X_scaled = scaler.transform(df[['x0', 'x1']])
df_scaled = pd.DataFrame({'label': df['label'],
'x0': X_scaled[:, 0],
'x1': X_scaled[:, 1]})
batch_scaled = pa.RecordBatch.from_pandas(df_scaled, preserve_index=False)
yield batch_scaled
在 pyarrow CSV 读取器中设置 block_size
选项,将文件分成块以控制线程粒度并生成多个记录批次。然后迭代这些批次以执行特征转换,方法是首先将特征转换为 Pandas DataFrame,处理数据,然后将结果转换回 Arrow 记录批次。这些转换步骤中的每一个都在批次上非常高效地执行,因此从 Pandas 来回转换的成本很低。最后,我们不是一次返回所有批次,而是使用处理后的批次调用 yield,这将返回一个生成器,该生成器允许它用作输入流的迭代器。ArrowStreamDataset
,它的构造函数接受一个记录批次迭代器作为输入,并可以使用之前函数 read_and_process()
返回的生成器。def make_local_dataset(filename):
"""Make a TensorFlow Arrow Dataset that reads from a local CSV file."""
# Read the local file and get a record batch iterator
batch_iter = read_and_process(filename)
# Create the Arrow Dataset as a stream from local iterator of record batches
ds = arrow_io.ArrowStreamDataset.from_record_batches(
batch_iter,
output_types=(tf.int64, tf.float64, tf.float64),
batch_mode='auto',
record_batch_iter_factory=partial(read_and_process, filename))
# Map the dataset to combine feature columns to single tensor
ds = ds.map(lambda l, x0, x1: (tf.stack([x0, x1], axis=1), l))
return ds
构造函数 ArrowStreamDataset.from_record_batches
接受记录批次迭代器、output_type
定义和 batch_mode
。使用的批处理模式为“auto”,它将自动创建批次大小等于传入 Arrow 记录批次大小的张量批次。这很有用,因为输入可以控制原始记录批次大小,但也可以指定不同的 batch_size
。record_batch_iter_factory
用于指定一个函数,该函数将初始化记录批次迭代器,以便在训练 epochs 期间多次使用它。最后一行添加了对 tf.data.Dataset.map()
的调用,该调用会将特征列堆叠成单个张量输出。ds = make_local_dataset(filename)
model = model_fit(ds)
print("Fit model with weights: {}".format(model.get_weights()))
# Fit model with weights:
# [array([[0.7793554 ], [0.61216295]], dtype=float32),
# array([0.03328196], dtype=float32)]
def read_and_process_dir(directory):
"""Read a directory of CSV files and yield processed Arrow batches."""
for f in os.listdir(directory):
if f.endswith(".csv"):
filename = os.path.join(directory, f)
for batch in read_and_process(filename):
yield batch
现在编写我们的服务函数,它将在 TCP 套接字上监听,读取每个文件,处理,然后将每个批次流式传输到客户端。def serve_csv_data(ip_addr, port_num, directory):
"""
Create a socket and serve Arrow record batches as a stream read from the
given directory containing CVS files.
"""
# Create the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((ip_addr, port_num))
sock.listen(1)
# Serve forever, each client will get one iteration over data
while True:
conn, _ = sock.accept()
outfile = conn.makefile(mode='wb')
writer = None
try:
# Read directory and iterate over each batch in each file
batch_iter = read_and_process_dir(directory)
for batch in batch_iter:
# Initialize the pyarrow writer on first batch
if writer is None:
writer = pa.RecordBatchStreamWriter(outfile, batch.schema)
# Write the batch to the client stream
writer.write_batch(batch)
# Cleanup client connection
finally:
if writer is not None:
writer.close()
outfile.close()
conn.close()
sock.close()
其中大部分是用于设置套接字服务器的样板代码。重要的是要展示正在使用 PyArrow RecordBatchStreamWriter
,它将一系列 Arrow 记录批次写入输出流(在本例中为 TCP 套接字)。ArrowStreamDataset
,但构造函数将传递我们远程服务器的端点,而不是 from_record_batches
。在内部,数据集内核将创建一个客户端连接,并开始通过套接字读取记录批次。def make_remote_dataset(endpoint):
"""Make a TensorFlow Arrow Dataset that reads from a remote Arrow stream."""
# Create the Arrow Dataset from a remote host serving a stream
ds = arrow_io.ArrowStreamDataset(
[endpoint],
columns=(0, 1, 2),
output_types=(tf.int64, tf.float64, tf.float64),
batch_mode='auto')
# Map the dataset to combine feature columns to single tensor
ds = ds.map(lambda l, x0, x1: (tf.stack([x0, x1], axis=1), l))
return ds
这里使用了一个单一的端点,但它也可以是包含多个端点的列表。其余部分与我们的本地数据集相同,除了不需要指定 record_batch_iter_factory
参数,因为服务器函数在客户端断开连接后会重复数据集,然后再次重新连接。serve_csv_data()
,并在一个单独的进程中执行 model_fit()
。一个完整的示例,展示了本地和远程训练,可以在 gist arrow_model_training_example 中找到。
2019 年 8 月 23 日 — 由 Bryan Cutler 发表
Apache Arrow 为 TensorFlow 提供了一种高性能数据交换方式,它既标准化又针对分析和机器学习进行了优化。来自 TensorFlow I/O 的 Arrow 数据集提供了一种将 Arrow 数据直接导入 TensorFlow tf.data 的方式,它将与现有输入管道和 tf.data.Dataset API 协同工作。
本博客将介绍不同的 Arrow d…