使用 tf.keras 训练和服务 ML 模型
2018 年 8 月 17 日
Stijn Decubber 发布,ML6 的机器学习工程师。

TensorFlow with Keras banner
Keras 是一个运行在多个后端之上的神经网络高级接口。它的函数式 API 非常用户友好,但同时灵活度也足够高,可以构建各种应用程序。Keras 在推出后迅速获得了关注,并在 2017 年,Keras API 被集成到 TensorFlow 核心,成为 tf.keras。虽然 tf.keras 和 Keras 拥有独立的代码库,但它们紧密耦合,并且随着 TensorFlow 1.9 的 更新的文档程序员指南 tf.keras 显然是使用 TensorFlow 构建神经网络时要寻找的高级 API

在本博文中,我们将逐步介绍使用 tf.keras 训练导出服务神经网络的过程。作为示例,我们将使用 Kaggle Planet 数据集训练一个卷积神经网络,以预测亚马逊森林卫星图像的标签。目标是说明现实世界用例的端到端管道。代码可在 github 上以可运行的笔记本形式获取。请注意,您需要安装最新版本的 TensorFlow 1.11.0(夜间构建版)才能一直跟进。这只是一个 pip install,并且 repo 中提供了 requirements.txt 文件。或者,您可以在 Google Colab 上开箱即用!

数据可在 Kaggle 上下载。训练数据包含大约 40000 张标记的亚马逊雨林图像。每张图像都与多个标签相关联
  • 恰好一个“天气”标签:晴朗、薄雾、多云或部分多云
  • 一个或多个“地面”标签:农业、裸地、居民区、道路、水域……

天气标签:多云。地面标签:原始森林、道路。
一个包含图像名称以及天气和地面标签(以二进制向量编码)的 Pandas DataFrame,以 .csv 文件的形式提供,可从 github 获取。
我们想要训练一个模型,能够准确地预测新图像的这些标签。我们将尝试使用一个具有两个独立输出(分别对应天气和地面标签)的网络来实现。预测天气标签是多类别分类问题的一个例子,而地面标签可以建模为多标签分类问题。因此,两个输出的损失函数将不同。在模型训练完成后,我们将导出它并使用 TensorFlow Serving 提供服务,以便我们可以通过 HTTP 发送请求,以获取图像的预测结果。

. . .

指定模型

我们将从头开始构建自己的模型¹。我们将采用一些经典的配置,包括一些卷积层、relu 激活以及顶部的两个密集分类器。
import tensorflow as tf
IM_SIZE = 128

image_input = tf.keras.Input(shape=(IM_SIZE, IM_SIZE, 3), name='input_layer')

# Some convolutional layers
conv_1 = tf.keras.layers.Conv2D(32,
                                kernel_size=(3, 3),
                                padding='same',
                                activation='relu')(image_input)
conv_1 = tf.keras.layers.MaxPooling2D(padding='same')(conv_1)
conv_2 = tf.keras.layers.Conv2D(32,
                                kernel_size=(3, 3),
                                padding='same',
                                activation='relu')(conv_1)
conv_2 = tf.keras.layers.MaxPooling2D(padding='same')(conv_2)

# Flatten the output of the convolutional layers
conv_flat = tf.keras.layers.Flatten()(conv_2)

# Some dense layers with two separate outputs
fc_1 = tf.keras.layers.Dense(128,
                             activation='relu')(conv_flat)
fc_1 = tf.keras.layers.Dropout(0.2)(fc_1)
fc_2 = tf.keras.layers.Dense(128,
                             activation='relu')(fc_1)
fc_2 = tf.keras.layers.Dropout(0.2)(fc_2)

# Output layers: separate outputs for the weather and the ground labels
weather_output = tf.keras.layers.Dense(4,
                                       activation='softmax',
                                       name='weather')(fc_2)
ground_output = tf.keras.layers.Dense(13,
                                      activation='sigmoid',
                                      name='ground')(fc_2)

# Wrap in a Model
model = tf.keras.Model(inputs=image_input, outputs=[weather_output, ground_output])
我们有两个输出层,因此在指定模型时应该将它们作为输出列表传递。请注意天气和地面输出层的不同激活函数。方便的是,tf.keras 中的 Model 实现带有实用的 summary() 方法:Keras 模型 在编译模型时,可以将两个不同的损失函数作为字典提供,该字典将张量名称映射到损失。
model.compile(optimizer='adam',
              loss={'weather': 'categorical_crossentropy',
                    'ground': 'binary_crossentropy'})
编译模型会用随机权重对其进行初始化,并允许我们选择一种优化算法来训练网络。

[1] 由于 Kaggle 竞赛期间的情况,使用大型预训练网络进行迁移学习是成功的关键之一。这里不是要击败 Kaggle。有关如何实现排行榜顶端性能的技巧,请查看处理此数据集的优秀 fast.ai 讲座

. . .

模型训练

让我们训练模型!我将在我的笔记本电脑上训练此模型,笔记本电脑没有足够的 RAM 将整个数据集加载到内存中。对于图像数据,这种情况非常普遍。Keras 提供了 model.fit_generator() 方法,可以使用自定义 Python 生成器从磁盘生成图像进行训练。但是,从 Keras 2.0.6 开始,我们可以使用 Sequence 对象代替生成器,它允许进行安全的 multiprocessing,这意味着速度显著提升,并且如果拥有 GPU,则可以降低出现瓶颈的风险。Keras 文档已经提供了良好的示例代码,我将对其进行一些自定义,以
  • 使其能够与将图像名称映射到标签的 DataFrame 配合使用
  • 在每个 epoch 之后对训练数据进行洗牌
import ast
import numpy as np
import math
import os
import random
from tensorflow.keras.preprocessing.image import img_to_array as img_to_array
from tensorflow.keras.preprocessing.image import load_img as load_img

def load_image(image_path, size):
    # data augmentation logic such as random rotations can be added here
    return img_to_array(load_img(image_path, target_size=(size, size))) / 255.

class KagglePlanetSequence(tf.keras.utils.Sequence):
    """
    Custom Sequence object to train a model on out-of-memory datasets. 
    """
    
    def __init__(self, df_path, data_path, im_size, batch_size, mode='train'):
        """
        df_path: path to a .csv file that contains columns with image names and labels
        data_path: path that contains the training images
        im_size: image size
        mode: when in training mode, data will be shuffled between epochs
        """
        self.df = pd.read_csv(df_path)
        self.im_size = im_size
        self.batch_size = batch_size
        self.mode = mode

        # Take labels and a list of image locations in memory
        self.wlabels = self.df['weather_labels'].apply(lambda x: ast.literal_eval(x)).tolist()
        self.glabels = self.df['ground_labels'].apply(lambda x: ast.literal_eval(x)).tolist()
        self.image_list = self.df['image_name'].apply(lambda x: os.path.join(data_path, x + '.jpg')).tolist()

    def __len__(self):
        return int(math.ceil(len(self.df) / float(self.batch_size)))

    def on_epoch_end(self):
        # Shuffles indexes after each epoch
        self.indexes = range(len(self.image_list))
        if self.mode == 'train':
            self.indexes = random.sample(self.indexes, k=len(self.indexes))

    def get_batch_labels(self, idx): 
        # Fetch a batch of labels
        return [self.wlabels[idx * self.batch_size: (idx + 1) * self.batch_size],
                self.glabels[idx * self.batch_size: (idx + 1) * self.batch_size]]

    def get_batch_features(self, idx):
        # Fetch a batch of images
        batch_images = self.image_list[idx * self.batch_size: (1 + idx) * self.batch_size]
        return np.array([load_image(im, self.im_size) for im in batch_images])

    def __getitem__(self, idx):
        batch_x = self.get_batch_features(idx)
        batch_y = self.get_batch_labels(idx)
        return batch_x, batch_y
Sequence 对象可与 fit_generator() 配合使用,代替自定义生成器来训练模型。请注意,无需提供每个 epoch 的步数,因为 __len__ 方法为生成器实现了该逻辑。
seq = KagglePlanetSequence('./KagglePlanetMCML.csv',
                       './data/train/',
                       im_size=IM_SIZE,
                       batch_size=32)
此外,tf.keras 提供对所有可用 Keras 回调的访问权限,这些回调可用于增强训练循环。它们的功能非常强大,并提供早期停止、学习率调度、存储 TensorBoard 文件等选项。这里,我们将使用 ModelCheckPointcallback 在每个 epoch 之后保存模型,以便我们之后可以恢复训练。默认情况下,模型架构、训练配置、优化器状态和权重都会被存储,这样一来,整个模型就可以从单个文件重新创建。让我们训练模型一个 epoch。
callbacks = [
    tf.keras.callbacks.ModelCheckpoint('./model.h5', verbose=1)
]

model.fit_generator(generator=seq,
                    verbose=1, 
                    epochs=1,
                    use_multiprocessing=True,
                    workers=4,
                    callbacks=callbacks)
Epoch 1/1
Epoch 00001: saving model to ./model.h5
1265/1265 [==============================] - 941s 744ms/step - loss: 0.8686 - weather_loss: 0.6571 - ground_loss: 0.2115
让我们训练模型一个 epoch:假设我们想在后期微调模型,我们可以简单地读取模型文件并恢复训练,无需重新编译。
another_model = tf.keras.models.load_model('./model.h5')
another_model.fit_generator(generator=seq, verbose=1, epochs=1)
最后,最好验证我们的 Sequence 是否有效地遍历了所有数据,方法是在测试模式下(即,不进行洗牌)实例化 Sequence,并使用它对整个数据集进行预测。
test_seq = KagglePlanetSequence('./KagglePlanetMCML.csv',
                       './data/train/',
                       im_size=IM_SIZE,
                       batch_size=32, mode='test')
predictions = model.predict_generator(generator=test_seq, verbose=1)
len(predictions[1])  == len(df_train) # This is True!

等等,Dataset API 呢?

tf.data API 是一个功能强大的库,它允许从各种来源获取数据并将其传递给 TensorFlow 模型。我们可以使用 tf.data API 而不是 Sequence 对象来训练 tf.keras 模型吗?可以。首先,让我们将图像和标签一起序列化到 TFRecord 文件中,这是在 TensorFlow 中序列化数据的推荐格式。
# Serialize images, together with labels, to TF records
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

tf_records_filename = './data/KagglePlanetTFRecord_{}'.format(IM_SIZE)
writer = tf.python_io.TFRecordWriter(tf_records_filename)

# List of image paths, np array of labels
im_list = [os.path.join('./data/train', v + '.jpg') for v in df_train['image_name'].tolist()]
w_labels_arr = np.array([ast.literal_eval(l) for l in df_train['weather_labels']])
g_labels_arr = np.array([ast.literal_eval(l) for l in df_train['ground_labels']])

# Loop over images and labels, wrap in TF Examples, write away to TFRecord file
for i in range(len(df_train)):
    w_labels = w_labels_arr[i].astype(np.float32)
    g_labels = g_labels_arr[i].astype(np.float32)
    im = np.array(img_to_array(load_img(im_list[i], target_size=(IM_SIZE, IM_SIZE))) / 255.)
    
    example = tf.train.Example(features=tf.train.Features(feature={'image': _bytes_feature(im.tostring()),
                                                                  'weather_labels': _bytes_feature(w_labels.tostring()),
                                                                  'ground_labels': _bytes_feature(g_labels.tostring())}))
    
    writer.write(example.SerializeToString())
    
writer.close()
将图像和标签转储到 TFRecord 文件中后,我们可以使用 tf.data API 创建另一个生成器。思路是使用 TFRecordDataset 从我们的文件中实例化一个对象,并告诉它如何使用 map() 操作解析序列化数据。
featdef = {
           'image': tf.FixedLenFeature(shape=[], dtype=tf.string),
           'weather_labels': tf.FixedLenFeature(shape=[], dtype=tf.string),
           'ground_labels': tf.FixedLenFeature(shape=[], dtype=tf.string)
          }
          
 def _parse_record(example_proto, clip=False):
    """Parse a single record into image, weather labels, ground labels"""
    example = tf.parse_single_example(example_proto, featdef)
    im = tf.decode_raw(example['image'], tf.float32)
    im = tf.reshape(im, (IM_SIZE, IM_SIZE, 3))
    weather = tf.decode_raw(ex['weather_labels'], tf.float32)
    ground = tf.decode_raw(ex['ground_labels'], tf.float32)
    return im, weather, ground

# Construct a TFRecordDataset
ds_train = tf.data.TFRecordDataset('./data/KagglePlanetTFRecord_{}'.format(IM_SIZE)).map(_parse_record)
ds_train = ds_train.shuffle(1000).batch(32)
Dataset 对象提供多种方法来生成迭代器对象,以遍历数据。但是,从 TensorFlow 1.9 开始,我们可以直接将 ds_train 传递给 model.fit() 来训练模型。
model = tf.keras.Model(inputs=image_input, outputs=[weather_output, ground_output])

model.compile(optimizer='adam',
              loss={'weather': 'categorical_crossentropy',
                    'ground': 'binary_crossentropy'})

history = model.fit(ds_train, 
                    steps_per_epoch=100, # let's take just a couple of steps
                    epochs=1)
Epoch 1/1
100/100 [==============================] - 76s 755ms/step - loss: 0.5460 - weather_loss: 0.3780 - ground_loss: 0.1680
效果不错。这种操作方式让习惯使用 TFRecords 的人员也能使用 tf.keras。如果要使用验证数据,可以简单地使用验证数据实例化另一个 Dataset,并将其也传递给 model.fit()

. . .

提供模型服务

什么是模型服务?我们想要的是:在客户端,我们有一个输入图像。我们想将此图像包装在某种消息中,将其发送到托管我们已训练模型的远程服务器,最后接收服务器的预测结果作为响应。
提供 ML 模型服务:客户端发送包含输入的请求,服务器从模型中获取预测结果,并将其作为响应发送回客户端。
首先,我们想要将模型导出为服务器可以处理的格式。TensorFlow 提供了 SavedModel 格式,作为导出模型的通用格式。在幕后,我们的 tf.keras 模型完全由 TensorFlow 对象指定,因此我们可以使用 TensorFlow 方法轻松导出它。

导出模型的主要思路是通过签名定义来指定推理计算。SignatureDef 完全由输入和输出张量指定,并最终与模型权重一起存储。但是,TensorFlow 提供了一个方便的函数 tf.saved_model.simple_save() ,它隐藏了一些细节,并且适用于大多数用例。
import tensorflow as tf

# The export path contains the name and the version of the model
tf.keras.backend.set_learning_phase(0) # Ignore dropout at inference
model = tf.keras.models.load_model('./model.h5')
export_path = './PlanetModel/1'

# Fetch the Keras session and save the model
# The signature definition is defined by the input and output tensors
# And stored with the default serving key
with tf.keras.backend.get_session() as sess:
    tf.saved_model.simple_save(
        sess,
        export_path,
        inputs={'input_image': model.input},
        outputs={t.name:t for t in model.outputs})
INFO:tensorflow:No assets to save.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: ./PlanetModel/1/saved_model.pb
请注意,我在导出路径中指定了一个版本号。这样做的原因是TensorFlow Serving 从存储模型的目录名称中推断模型版本。如果我们以后有了更好的模型,可以将其存储在 PlanetModel/2 下,TF Serving 会自动更新以托管新模型。模型图存储在版本子目录中,变量存储在另一个子目录中。
$ tree
.
└── 1
    ├── saved_model.pb
    └── variables
        ├── variables.data-00000-of-00001
        └── variables.index
在设置实际服务器之前,我想重点介绍一下 TensorFlow 的 SavedModel 命令行工具,它对于快速查看模型的输入和输出规范很有用。
$ saved_model_cli show --dir ./ --all
The given SavedModel SignatureDef contains the following input(s):
  inputs['input_image'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 128, 128, 3)
      name: input_layer_2:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['ground_2/Sigmoid:0'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 13)
      name: ground_2/Sigmoid:0
  outputs['weather_2/Softmax:0'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 4)
      name: weather_2/Softmax:0
Method name is: tensorflow/serving/predict
我们甚至可以在 CLI 中使用 numpy(作为 np)来向模型发送一些随机输入,以验证其是否有效。
$ saved_model_cli run --dir ./ --tag_set serve --signature_def serving_default --input_exp 'input_image=np.random.rand(1,128,128,3)'
Result for output key ground_2/Sigmoid:0:
[[6.5955728e-01 9.8123280e-03 1.4992488e-02 1.9942504e-06 3.5892407e-07
  3.2538961e-04 2.4094069e-02 6.0808718e-01 9.8486900e-01 7.9137814e-01
  1.4336356e-05 1.6872218e-05 3.8697788e-01]]
Result for output key weather_2/Softmax:0:
[[7.1896911e-01 2.9373894e-04 2.5214682e-05 2.8071195e-01]]
看起来没问题!

. . .

使用 TensorFlow Serving 托管模型服务器

我们将使用 TensorFlow Serving 库来托管模型。

TensorFlow Serving 是一个灵活的高性能的机器学习模型服务系统,专为生产环境设计

Servables 是 TensorFlow Serving 中的核心抽象,它将代表我们的模型。除此之外,TF Serving 还提供了SourcesLoadersManagers,它们负责处理实际的服务、新版本的加载以及旧版本的卸载。

在本教程中,我们将在本地设置服务器。在生产环境中,您可以在某个微服务架构中以完全相同的方式设置服务器,例如在 Kubernetes 集群中的 pod 上。

在顶级模型目录中托管模型只需要一个命令。我截断了一些输出,并在下面突出显示了 TF Serving 后端的一些组件。
$ tensorflow_model_server --model_base_path=$(pwd) --rest_api_port=9000 --model_name=PlanetModel
I tensorflow_serving/core/basic_manager] Successfully reserved resources to load servable {name: PlanetModel version: 1}
I tensorflow_serving/core/loader_harness.cc] Loading servable version {name: PlanetModel version: 1}
I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc] Loading SavedModel with tags: { serve };
I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc] SavedModel load for tags { serve }; Status: success. Took 1048518 microseconds.
I tensorflow_serving/core/loader_harness.cc] Successfully loaded servable version {name: PlanetModel version: 1}
I tensorflow_serving/model_servers/main.cc] Exporting HTTP/REST API at:localhost:9000 ...
服务器启动并运行后,我们可以向其发送请求。从 TensorFlow Serving 1.8 开始,可以通过 gRPC 或 HTTP 调用托管模型。在这两种情况下,思路都是一样的:我们希望填充一个有效负载消息并将其发送到服务器,服务器应该返回包含预测的消息。对于 gRPC,有可用的 python 绑定以 protobuf 文件的形式填充消息。要发送 HTTP 请求,我们可以简单地使用 python requests 模块将我们的输入包装在有效负载 json 中。
import requests
import json

image = img_to_array(load_img('./data/train/train_10001.jpg', target_size=(128,128))) / 255.
payload = {
  "instances": [{'input_image': image.tolist()}]
}
r = requests.post('http://localhost:9000/v1/models/PlanetModel:predict', json=payload)
json.loads(r.content)
请求 URL 是根据 TF Serving 文档中描述的一些规则组成的。发送请求后,服务器会在几毫秒内返回天气和地面标签的输出列表。
{u'predictions': [
     {u'ground_2/Sigmoid:0': [
        0.153237,
        0.000527727,
        0.00555856,
        0.00542973,
        0.00105254,
        0.000256282,
        0.103614,
        0.0325185,
        0.998204,
        0.072204,
        0.00745501,
        0.00326175,
        0.0942268],
   u'weather_2/Softmax:0': [
        0.963947,
        0.000207846,
        0.00113924,
        0.0347063]
     }]}
预测:原始森林,晴朗的天空,但错过了农业和道路。回到训练!

. . .

总结

Tf.keras 将 Keras 的全部功能和灵活性带给 TensorFlow 用户。使用它很愉快,它与 TensorFlow 核心代码的集成无疑是朝着让更广泛的受众使用深度学习迈出的重要一步。它们可以导出到 SavedModel 格式,并像任何其他 TF 模型一样使用 TensorFlow Serving 提供服务,这使得在生产环境中使用 tf.keras 变得非常简单。

如果你想开始使用 tf.keras,请务必查看更新的 TensorFlow 教程:https://tensorflowcn.cn/tutorials/。对于 TensorFlow Serving,我建议看看在线文档以及github.
下一篇文章
Training and Serving ML models with tf.keras

发布者:Stijn DecubberML6 的机器学习工程师。


Keras 是一个运行在多个后端之上的神经网络高级接口。它的函数式 API 非常用户友好,并且足够灵活,可以构建各种应用程序。Keras 在推出后迅速流行起来,并在 2017 年,Keras API 被集成到 TensorFlow 核心代码中,成为 tf.keras。尽管 tf.kerasand …