2021 年 5 月 26 日 — 作者:Nikita Namjoshi,机器学习解决方案工程师当一台机器不够用时,是时候使用 TensorFlow 的 MultiWorkerMirroredStrategy 来加速训练和迭代了。在本教程式文章中,您将学习如何使用 AI Platform Training 在 Google Cloud Platform (GCP) 上启动多工作器训练作业。您还将学习 TensorFlow 如何分布数据并在多台机器之间实现同步数据并行的基本知识。虽然本文侧重于 GCP 上的托管解决方案,但您也可以完全在自己的硬件上使用开源软件来完成所有这些操作。
作者:Nikita Namjoshi,机器学习解决方案工程师
当一台机器不够用时,是时候使用 TensorFlow 的 MultiWorkerMirroredStrategy 来加速训练和迭代了。在本教程式文章中,您将学习如何使用 AI Platform Training 在 Google Cloud Platform (GCP) 上启动多工作器训练作业。您还将学习 TensorFlow 如何分布数据并在多台机器之间实现同步数据并行的基本知识。虽然本文侧重于 GCP 上的托管解决方案,但您也可以完全在自己的硬件上使用开源软件来完成所有这些操作。
如果您只有一块 GPU,TensorFlow 将使用此加速器来加快模型训练,而无需您做任何额外的工作。但是,如果您想通过在单台机器上使用多个 GPU 或多台机器(每台机器可能有多个 GPU)来获得额外的性能提升,那么您需要使用 tf.distribute,它是 TensorFlow 的用于跨多个设备运行计算的库。
开始使用分布式训练最简单的方法是在单台机器上使用多个 GPU 设备。来自 tf.distribute 模块的 TensorFlow 分布策略将管理跨所有 GPU 的数据分布和梯度更新的协调。如果您想了解有关在此场景中进行训练的更多信息,请查看 以前关于分布式训练基础知识的文章。
如果您已经掌握了单主机训练并希望进一步扩展,那么向集群中添加多台机器可以帮助您获得更大的性能提升。您可以使用仅包含 CPU 或每台机器都包含一个或多个 GPU 的机器集群。
在 GCP 上执行多工作器训练有很多方法。在本文中,我们将使用 AI Platform Training,因为它是启动分布式训练作业最快的方法,并且它还具有使其非常易于作为您生产管道一部分的附加功能。要使用此托管服务,您需要向程序添加一些额外的代码,并设置一个特定于 AI Platform 的配置文件。但是,您将不必忍受 GPU 驱动程序安装或集群管理的痛苦,这在分布式场景中可能非常具有挑战性。
tf.distribute 模块目前提供了两种用于多工作器训练的策略。在 TensorFlow 2.5 中,ParameterServerStrategy 是实验性的,而 MultiWorkerMirroredStrategy 是稳定的 API。
与它的单工作器对应物 MirroredStrategy 一样,MultiWorkerMirroredStrategy 是一种同步数据并行策略,您只需要更改几行代码即可使用。
但是,与 MirroredStrategy 不同的是,对于多工作器设置,TensorFlow 需要知道哪些机器是集群的一部分。这通常使用环境变量 TF_CONFIG 指定。
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief": ["host1:port"],
"worker": ["host2:port", "host3:port"],
},
"task": {"type": "worker", "index": 1}
})
在这个简单的 TF_CONFIG 示例中,“cluster” 键包含一个字典,其中包含所有机器的内部 IP 地址和端口。在 MultiWorkerMirroredStrategy 中,所有机器都被指定为 *工作器*,它们是执行复制计算的物理机器。除了每台机器都是工作器之外,还需要一台工作器承担一些额外的工作,例如保存检查点和将摘要文件写入 TensorBoard。这台机器被称为 *主节点*(或使用其弃用名称 *master*)。
将机器添加到 cluster 键后,下一步是设置 “task”。这指定了当前机器的任务类型和任务索引,它是集群字典中的一个索引。集群键在每台机器上都应该相同,但任务键将不同。
方便的是,在使用 AI Platform Training 时,TF_CONFIG 环境变量会为您设置在集群中的每台机器上,因此您无需担心此设置!
但是,如果您尝试使用例如在 Google Compute Engine 上运行的 3 个实例的多工作器作业,则需要在每台机器上设置此环境变量,如下所示。对于不是主节点的机器,TF_CONFIG 看起来相同,只是任务索引递增 1。
机器 1(主节点)
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief": ["host1:port"],
"worker": ["host2:port", "host3:port"],
},
"task": {"type": "chief", "index": 0}
})
机器 2
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief": ["host1:port"],
"worker": ["host2:port", "host3:port"],
},
"task": {"type": "worker", "index": 0}
})
机器 3
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief": ["host1:port"],
"worker": ["host2:port", "host3:port"],
},
"task": {"type": "worker", "index": 1}
})
当您的集群中只有少数机器时,设置此环境变量非常容易;但是,当您开始扩展时,您不希望手动将此变量分配给每台机器。如前所述,使用 AI Platform 的众多优势之一是这种协调会自动发生。您只需提供集群中的机器数量以及每台机器的 GPU 数量和类型。我们将在后面的部分进行此操作。
在此 Colab 笔记本中,您将找到用于在 木薯数据集 上训练 ResNet50 架构 的代码。在以下部分中,我们将回顾需要添加到程序中的新代码,以便在多台机器上进行分布式训练。
与 tf.distribute 模块中的任何策略一样,第一步是实例化策略。
strategy = tf.distribute.MultiWorkerMirroredStrategy()
请注意,存在一个限制,即 MultiWorkerMirroredStrategy 的实例需要在程序开始时创建。可能创建操作的代码应放在实例化策略之后。
接下来,您将模型变量的创建包装在策略的作用域内。此关键步骤告诉 TensorFlow 哪些变量应该在副本之间镜像。
with strategy.scope():
model = create_model()
model.compile(
loss='sparse_categorical_crossentropy',
optimizer=tf.keras.optimizers.Adam(0.0001),
metrics=['accuracy'])
最后,您需要根据集群中的副本数量缩放批次大小。这确保了每个副本在每一步都处理相同数量的示例。
per_replica_batch_size = 64
global_batch_size = per_replica_batch_size * strategy.num_replicas_in_sync
如果您以前使用过 MirroredStrategy,那么前面的步骤应该很熟悉。从一台机器上的同步数据并行移动到多台机器上的主要区别在于,现在需要将每一步结束时的梯度同步到机器中的所有 GPU *以及* 集群中的所有机器。跨机器同步的这一额外步骤会增加分布的开销。
在 TensorFlow 中,多工作器全减通信是通过 CollectiveOps 实现的。您不需要了解很多细节来执行成功且高效的训练作业,但从高层面上讲,集体操作是 TensorFlow 图中的单个操作,它可以根据硬件、网络拓扑和张量大小等因素自动选择全减算法。
在单工作器情况下,在每一步,您的数据集都会在机器上的副本之间进行划分。在多工作器情况下,此数据分割过程会稍微复杂一些。数据现在还需要 *分片*,这意味着每个工作器都会被分配整个数据集的一个子集。因此,在每一步,每个工作器都会处理来自非重叠数据集元素的全局批次大小。这种分片通过 tf.data.experimental.AutoShardPolicy 自动完成。
默认情况下,TensorFlow 首先会尝试按 FILE 对数据进行分片。这意味着如果您的数据存在于多个文件中,每个工作器将处理不同的文件,并在副本之间分割相应的数据。FILE 是默认的自动分片策略,因为 MultiWorkerMirroredStrategy 最适合非常大的数据集的使用案例,这些数据集很可能不在单个文件中。但是,如果文件数量不能被工作器数量整除,或者某些文件明显比其他文件长,则此选项会导致工作器处于空闲状态。
如果您的数据未存储在多个文件中,则 AutoShardPolicy 将回退到 DATA,这意味着 TensorFlow 将在所有工作器之间对元素进行自动分片。这可以防止潜在的工作器空闲情况,但缺点是整个数据集将在每个工作器上读取。您可以阅读有关不同策略的更多信息,并 查看分布式输入指南中的示例。
如果您不想使用默认的 AUTO 策略,可以使用以下代码设置所需的 AutoShardPolicy
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
train_data = train_data.with_options(options)
在多工作器情况下保存模型会稍微复杂一些,因为目标对于每个工作器来说都必须不同。主工作器将保存到所需的模型目录,而其他工作器将保存到临时目录。重要的是,这些临时目录必须是唯一的,以防止多个工作器写入相同的位置。保存可能包含集体操作,因此所有工作器都必须保存,而不仅仅是主工作器。
以下是一些实现预期的保存逻辑以及一些在训练完成后删除临时目录的清理工作的样板代码。请注意,model_path 是 Google Cloud Storage (GCS) 存储桶的名称,您的模型将在训练结束时保存到该存储桶。
model_path = {gs://path_to_your_gcs_bucket}
# Note that with MultiWorkerMirroredStrategy,
# the program is run on every worker.
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configurations.
# 1) In addition to `worker` tasks, a `chief` task type is used.
# The implementation demonstrated here is for this case.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. In this case, this function
# should be modified to
# return (task_type == 'worker' and task_id == 0) or task_type is None
return task_type == 'chief'
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
# Determine type and task of the machine from
# the strategy cluster resolver
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# Based on the type and task, write to the desired model path
write_model_path = write_filepath(model_path, task_type, task_id)
model.save(write_model_path)
我们已经介绍过的有关设置分布策略、分片数据和保存模型的所有内容,无论您是在 GCP、自己的硬件还是其他云平台上训练,都适用。
使用 AI Platform 的基本先决条件是您需要有一个启用了结算功能的 GCP 项目、启用了 AI Platform API 以及 足够的 AI Platform 配额。 如果这些步骤中的任何一个让您感到困惑,请 参考以前 的文章以了解 GCP 基础知识。
如果您已经熟悉在 AI Platform 上使用单个节点进行训练,那么您很可能能够轻松完成本节。我们将采用上一节中介绍的内容,并进行一些调整以符合 AI Platform 训练约定。所有代码都可以在 此 Github 仓库 中找到,但我们将在本节中详细介绍。
根据 AI Platform 约定,训练代码根据以下图表进行排列。task.py 文件包含执行训练作业的代码。本教程中的示例还包括一个 model.py 文件,其中包含模型的 Keras 函数式 API 代码。对于更复杂的生产应用程序,您可能会有额外的 util.py 或 setup.py 文件,您可以在下面的层次结构中看到它们的位置。
model.py 文件 可以在 Github 上找到。您可以看到,此文件仅包含构建 ResNet50 模型架构的代码。
task.py 文件 可以在 Github 上找到。此文件包含主函数,该函数将执行训练作业并保存模型。
def main():
args = get_args()
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
train_data, number_of_classes = create_dataset(global_batch_size)
with strategy.scope():
model = create_model(number_of_classes)
model.fit(train_data, epochs=args.epochs)
# Determine type and task of the machine from
# the strategy cluster resolver
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# Based on the type and task, write to the desired model path
write_model_path = write_filepath(args.job_dir, task_type, task_id)
model.save(write_model_path)
在此简单示例中,数据预处理直接在 task.py 文件中进行,但实际上,对于更复杂的数据处理,您可能希望将此代码拆分为一个单独的 data.py 文件,您可以将其导入 task.py 中(例如,如果您的预处理包括解析 TFRecord 文件)。
在这种情况下,我们明确将 AutoShardPolicy 设置为 DATA,因为 Cassava 数据集不是作为多个文件下载的。但是,如果我们没有将策略设置为 DATA,则默认的 AUTO 策略将生效,最终结果将相同。
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
train_data = train_data.with_options(options)
task.py 文件还会解析我们需要的任何命令行参数。在此简单示例中,纪元通过命令行传递。此外,我们需要解析参数 job-dir,即我们的模型将存储在其中的 GCS 存储桶。
def get_args():
'''Parses args.'''
parser = argparse.ArgumentParser()
parser.add_argument(
'--epochs',
required=True,
type=int,
help='number training epochs')
parser.add_argument(
'--job-dir',
required=True,
type=str,
help='bucket to save model')
args = parser.parse_args()
return args
最后,task.py 文件包含用于保存模型的样板代码。对于生产示例,您可能希望将此样板代码添加到 util.py 文件中,但对于此简单示例,我们将所有内容都保留在一个文件中。
AI Platform 提供 标准运行时 供您执行训练作业。虽然这些运行时可能适合您的用例,但更专业的需求需要自定义容器。在本节中,我们将介绍如何设置您的容器映像并将其推送到 Google Container Registry (GCR)。
以下 Dockerfile 指定了基本映像,使用 TensorFlow 2.5 Enterprise GPU 深度学习容器。使用 TensorFlow Enterprise 映像作为我们的基本映像为在 GCP 上开发提供了一个有用的设计模式。TensorFlow Enterprise 是 TensorFlow 的一个发行版,针对 GCP 进行了优化。您可以将 TensorFlow Enterprise 与 AI Platform Notebooks、深度学习虚拟机和 AI Platform 训练一起使用,从而在不同环境之间实现无缝过渡。
我们训练器目录中的代码被复制到 Docker 映像中,我们的入口点是 task.py 脚本,我们将以模块的形式运行它。
# Specifies base image and tag
FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-5
WORKDIR /root
# Copies the trainer code to the docker image.
COPY trainer/ /root/trainer/
# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]
接下来,我们将设置一些有用的环境变量。您可以为 IMAGE_REPO_NAME 和 IMAGE_TAG 选择您选择的任何名称。如果您尚未设置 Google Cloud SDK,您可以按照 此处的步骤 进行操作,因为您需要使用 gcloud 工具来推送您的容器并启动训练作业。
export PROJECT_ID=$(gcloud config list project --format "value(core.project)")
export IMAGE_REPO_NAME={your_repo_name}
export IMAGE_TAG={your_image_tag}
export IMAGE_URI=gcr.io/$PROJECT_ID/$IMAGE_REPO_NAME:$IMAGE_TAG
接下来,您将构建您的 Dockerfile。
docker build -f Dockerfile -t $IMAGE_URI ./
最后,您可以将您的映像推送到 GCR。
gcloud auth configure-docker
docker push $IMAGE_URI
如果您导航到 GCP 控制台 UI 中的 GCR 页面,您应该会看到您新推出的映像。
在我们可以启动训练作业之前,最后一步是设置集群。AI Platform 提供了一组预定义的集群规范,称为 规模层级,但我们需要为分布式训练提供我们自己的集群设置。
在以下 config.yaml 文件中,我们指定了一个主节点(相当于*chief*)和一个工作节点。每台机器都配备了一个 NVIDIA T4 Tensor Core GPU。对于这两台机器,您还需要将 imageUri 指定为您在上一步中推送到 GCR 的映像。
trainingInput:
scaleTier: CUSTOM
masterType: n1-standard-8
masterConfig:
acceleratorConfig:
count: 1
type: NVIDIA_TESLA_T4
imageUri: gcr.io/{path/to/image}:{tag}
useChiefInTfConfig: true
workerType: n1-standard-8
workerCount: 1
workerConfig:
acceleratorConfig:
count: 1
type: NVIDIA_TESLA_T4
imageUri: gcr.io/{path/to/image}:{tag}
如果您想知道 useChiefInTfConfig
标志的作用,TensorFlow 使用术语“Chief”,而 AI Platform 使用术语“Master”,因此此标志将管理这种差异。您无需担心细节(尽管如果您忘记设置此标志,您将看到错误消息!)。
您可以随意尝试此配置,添加机器、添加 GPU 或删除所有 GPU 并仅使用 CPU 进行训练。您可以查看 此处支持的 AI Platform 地区和 GPU 类型,因此请确保您的项目具有足够配额来满足您选择的任何配置。
您可以使用以下命令轻松启动您的训练作业
gcloud ai-platform jobs submit training {job_name} \
--region europe-west2 \
--config config.yaml \
--job-dir gs://{gcs_bucket/model_dir} -- \
--epochs 5 \
在上面的命令中,您需要为您的作业指定一个名称。除了传递区域之外,您还需要定义 job-dir,即您希望在训练完成后将保存的模型文件存储在 GCS 存储桶中的目录。
空的 -- 标志表示 gcloud 特定标志的结束和您要传递给应用程序的 args 的开始(在本例中,这只是纪元)。
执行训练命令后,您应该会看到以下消息。
您可以导航到 GCP 控制台中 AI Platform UI 并跟踪您的作业状态。
您会注意到,您的作业大约需要十分钟才能启动。在我们的简单示例中,这种开销可能看起来很大,因为它甚至不需要十分钟就能在一个 GPU 上进行训练。但是,对于大型作业,这种开销将被摊销。
作业完成训练后,您将在作业旁边看到一个绿色对勾。然后,您可以单击模型位置 URI,您将找到您的 saved_model.pb 文件。
现在,您了解了在 GCP 上启动多工作者训练作业的基础知识。您还了解了 MultiWorkerMirroredStrategy 的核心概念。为了将您的技能提升到下一个水平,尝试为您的下一个训练作业利用 AI Platform 的 超参数调整 功能(在开源中,您可以使用 Keras Tuner),或者使用 TFRecord 文件 作为您的输入数据。您也可以尝试 参数服务器策略,如果您想探索 TensorFlow 中的异步训练。祝您分布式训练愉快!
2021 年 5 月 26 日 — 作者:Nikita Namjoshi,机器学习解决方案工程师 当一台机器不够用时,是时候使用 TensorFlow 的 MultiWorkerMirroredStrategy 进行更快地训练和迭代了。在本教程式文章中,您将学习如何使用 AI Platform Training 在 Google Cloud Platform (GCP) 上启动多工作者训练作业。您还将学习 TensorFlow 如何分发数据和 i…