使用自定义 scikit-learn 流水线进行预测

Colab 徽标 在 Colab 中以笔记本的形式运行本教程 GitHub 徽标 在 GitHub 上查看笔记本

本教程介绍如何通过 AI Platform Prediction 部署一个使用自定义转换器的 scikit-learn 流水线。

借助 scikit-learn 流水线,您可以编写多个 Estimator。例如,您可以使用转换器预处理数据,然后将转换后的数据传递给分类器。scikit-learn 在 sklearn 软件包中提供了多个转换器

您还可以使用 scikit-learn 的 FunctionTransformerTransformerMixin 类创建自定义转换器。如果您想要将使用自定义转换器的流水线部署到 AI Platform Prediction,则必须将该代码以为源分发软件包的形式提供给 AI Platform Prediction

本教程展示了一个涉及人口普查数据的示例问题,以引导您完成以下步骤:

  • 在 AI Platform Training 上训练一个带有自定义转换器的 scikit-learn 流水线
  • 将经过训练的流水线和您的自定义代码部署到 AI Platform Prediction
  • 从该部署中处理预测请求

数据集

本教程使用加利福尼亚大学欧文分校机器学习存储库提供的美国人口普查收入数据集。该数据集包含 1994 年人口普查数据库中人员的相关信息,包括年龄、教育程度、婚姻状况、职业以及年收入是否超过 5 万美元。

本教程中使用的数据可在公开 Cloud Storage 存储分区中获取:gs://cloud-samples-data/ai-platform/sklearn/census_data/

目标

目标是训练一个 scikit-learn 流水线,以根据某人的其他人口普查信息(特征)预测其年收入是否超过 $50000(目标标签)。

本教程重点介绍如何将此模型与 AI Platform Prediction 结合使用,而不是仅仅介绍模型本身的设计。 但在构建机器学习系统时,请务必始终考虑到潜在问题和意外后果。请参阅有关公平性的机器学习速成课程练习,详细了解人口普查数据集中的偏差来源及机器学习公平性。

费用

本教程使用 Google Cloud 的以下收费组件:

  • AI Platform Training
  • AI Platform Prediction
  • Cloud Storage

了解 AI Platform Training 价格AI Platform Prediction 价格Cloud Storage 价格,并使用价格计算器根据您的预计使用情况来估算费用。

准备工作

在 AI Platform Prediction 中训练和部署模型之前,您必须先完成以下事项:

  • 设置本地开发环境。
  • 设置 Google Cloud 项目,并启用结算功能和必要的 API。
  • 创建 Cloud Storage 存储分区以存储您的训练软件包和经过训练的模型。

设置本地开发环境

您需要以下资源才能完成本教程:

  • Python 3
  • virtualenv
  • Google Cloud SDK

有关设置 Python 开发环境的 Google Cloud 指南详细说明了如何满足这些要求。以下步骤提供了一系列简要的说明:

  1. 安装 Python 3

  2. 安装 virtualenv 并创建一个使用 Python 3 的虚拟环境。

  3. 激活该环境。

  4. 完成以下部分中的步骤以安装 Google Cloud SDK。

设置您的 Google Cloud 项目

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the AI Platform Training & Prediction and Compute Engine APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the AI Platform Training & Prediction and Compute Engine APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

验证您的 GCP 账号

要设置身份验证,您需要创建服务账号密钥并为服务账号密钥的文件路径设置环境变量。

  1. 创建服务账号:

    1. 在 Google Cloud 控制台中,转到创建服务账号页面。

      转到“创建服务账号”

    2. 服务账号名称字段中,输入一个名称。
    3. 可选:在服务账号说明字段中,输入说明。
    4. 点击创建
    5. 点击选择角色字段。在所有角色下,选择 AI Platform > AI Platform Admin
    6. 点击添加其他角色
    7. 点击选择角色字段。在所有角色下,选择存储 > Storage Object Admin

    8. 点击完成以创建服务账号。

      不要关闭浏览器窗口。您将在下一步骤中用到它。

  2. 创建用于身份验证的服务账号密钥:

    1. 在 Google Cloud 控制台中,点击您创建的服务账号的电子邮件地址。
    2. 点击密钥
    3. 依次点击添加密钥创建新密钥
    4. 点击创建。JSON 密钥文件将下载到您的计算机上。
    5. 点击关闭
  3. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含服务账号密钥的 JSON 文件的文件路径。此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

创建 Cloud Storage 存储分区

本教程以多种方式使用 Cloud Storage:

  • 如果您使用 Cloud SDK 提交训练作业,则将包含训练代码的 Python 软件包上传到 Cloud Storage 存储分区。 AI Platform Training 运行此软件包中的代码。

  • 在本教程中,AI Platform Training 还会将您的作业生成的训练模型保存在同一个存储分区中。

  • 如需将使用自定义代码的 scikit-learn 流水线部署到 AI Platform Prediction,您必须将流水线使用的自定义转换器上传到 Cloud Storage。

在创建用于提供预测服务的 AI Platform Prediction 版本资源时,您将经过训练的 scikit-learn 流水线和您的自定义代码作为 Cloud Storage URI 提供。

将 Cloud Storage 存储分区的名称设置为环境变量。它在所有 Cloud Storage 存储分区中必须是唯一的:

BUCKET_NAME="your-bucket-name"

选择一个可以使用 AI Platform TrainingAI Platform Prediction 的区域,然后另外创建一个环境变量。例如:

REGION="us-central1"

在您选择的区域中创建 Cloud Storage 存储分区,稍后使用同一区域进行训练和预测。如果存储分区尚不存在,请运行以下命令创建一个:

gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION

创建训练应用和自定义流水线代码

创建一个应用,以使用人口普查数据训练 scikit-learn 流水线。在本教程中,训练软件包还包含经过训练的流水线在预测过程中使用的自定义代码。该模式有用,因为流水线通常设计为在训练和预测过程中使用相同的转换器。

按照以下步骤创建一个与以下结构匹配且包含三个文件的目录:

census_package/
    __init__.py
    my_pipeline.py
    train.py

首先,创建空 census_package/ 目录:

mkdir census_package

census_package/ 中,创建一个名为 __init__.py 的空白文件:

touch ./census_package/__init__.py

这样便能够将 census_package/ 作为软件包导入 Python 中。

创建自定义转换器

scikit-learn 提供许多转换器,您可以将这些转换器用作流水线的一部分,还可以通过这些转换器来自定义转换器。这些转换器甚至可以在训练过程中获知已保存的状态,稍后在预测过程中使用该状态。

扩展 sklearn.base.TransformerMixin 以定义三个转换器:

  • PositionalSelector:如果提供索引 C 和矩阵 M 的列表,它会返回一个矩阵,其中包含 M 的列的子集,并由 C 指示。

  • StripString:如果提供一个字符串矩阵,它会删除每个字符串中的空格。

  • SimpleOneHotEncoder:简单的独热编码器,可应用于字符串矩阵。

为此,将以下代码写入名为 census_package/my_pipeline.py 的文件。

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin


class PositionalSelector(BaseEstimator, TransformerMixin):
    def __init__(self, positions):
        self.positions = positions

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return np.array(X)[:, self.positions]


class StripString(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        strip = np.vectorize(str.strip)
        return strip(np.array(X))


class SimpleOneHotEncoder(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        self.values = []
        for c in range(X.shape[1]):
            Y = X[:, c]
            values = {v: i for i, v in enumerate(np.unique(Y))}
            self.values.append(values)
        return self

    def transform(self, X):
        X = np.array(X)
        matrices = []
        for c in range(X.shape[1]):
            Y = X[:, c]
            matrix = np.zeros(shape=(len(Y), len(self.values[c])), dtype=np.int8)
            for i, x in enumerate(Y):
                if x in self.values[c]:
                    matrix[i][self.values[c][x]] = 1
            matrices.append(matrix)
        res = np.concatenate(matrices, axis=1)
        return res

定义流水线并创建训练模块

接下来,创建一个训练模块,以使用人口普查数据训练您的 scikit-learn 流水线。此代码的一部分涉及定义流水线。

该训练模块执行以下操作:

  • 下载训练数据,并将该数据加载到 scikit-learn 可以使用的 Pandas DataFrame 中。
  • 定义要训练的 scikit-learn 流水线。该示例仅使用输入数据中的三个数值特征('age''education-num''hours-per-week')和三个分类特征('workclass''marital-status''relationship')。该模块使用 scikit-learn 的内置 StandardScaler 转换数值特征,并使用您在 my_pipeline.py 中定义的自定义独热编码器转换分类特征。然后,它会合并经过预处理的数据以作为分类器的输入。
  • 最后,使用 scikit-learn 中包含的 joblib 版本导出模型,并将模型保存到 Cloud Storage 存储分区。

将以下代码写入 census_package/train.py

import warnings
import argparse
from google.cloud import storage

import pandas as pd
import numpy as np
from sklearn.externals import joblib
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.pipeline import Pipeline, FeatureUnion, make_pipeline
import census_package.my_pipeline as mp
warnings.filterwarnings('ignore')


def download_data(bucket_name, gcs_path, local_path):
    bucket = storage.Client().bucket(bucket_name)
    blob = bucket.blob(gcs_path)
    blob.download_to_filename(local_path)


def upload_data(bucket_name, gcs_path, local_path):
    bucket = storage.Client().bucket(bucket_name)
    blob = bucket.blob(gcs_path)
    blob.upload_from_filename(local_path)


def get_features_target(local_path):
    strip = np.vectorize(str.strip)
    raw_df = pd.read_csv(local_path, header=None)
    target_index = len(raw_df.columns) - 1  # Last columns, 'income-level', is the target

    features_df = raw_df.drop(target_index, axis=1)
    features = features_df.as_matrix()
    target = strip(raw_df[target_index].values)
    return features, target


def create_pipeline():
    # We want to use 3 categorical and 3 numerical features in this sample.
    # Categorical features: age, education-num, and hours-per-week
    # Numerical features: workclass, marital-status, and relationship
    numerical_indices = [0, 4, 12]  # age, education-num, and hours-per-week
    categorical_indices = [1, 5, 7]  # workclass, marital-status, and relationship

    p1 = make_pipeline(mp.PositionalSelector(categorical_indices), mp.StripString(), mp.SimpleOneHotEncoder())
    p2 = make_pipeline(mp.PositionalSelector(numerical_indices), StandardScaler())

    feats = FeatureUnion([
        ('numericals', p1),
        ('categoricals', p2),
    ])

    pipeline = Pipeline([
        ('pre', feats),
        ('estimator', GradientBoostingClassifier(max_depth=4, n_estimators=100))
    ])
    return pipeline


def get_bucket_path(gcs_uri):
    if not gcs_uri.startswith('gs://'):
        raise Exception('{} does not start with gs://'.format(gcs_uri))
    no_gs_uri = gcs_uri[len('gs://'):]
    first_slash_index = no_gs_uri.find('/')
    bucket_name = no_gs_uri[:first_slash_index]
    gcs_path = no_gs_uri[first_slash_index + 1:]
    return bucket_name, gcs_path


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--gcs_data_path', action="store", required=True)
    parser.add_argument('--gcs_model_path', action="store", required=True)

    arguments, others = parser.parse_known_args()

    local_path = '/tmp/adul.data'
    data_bucket, data_path = get_bucket_path(arguments.gcs_data_path)
    print('Downloading the data...')
    download_data(data_bucket, data_path, local_path)
    features, target = get_features_target(local_path)
    pipeline = create_pipeline()

    print('Training the model...')
    pipeline.fit(features, target)

    joblib.dump(pipeline, './model.joblib')

    model_bucket, model_path = get_bucket_path(arguments.gcs_model_path)
    upload_data(model_bucket, model_path, './model.joblib')
    print('Model was successfully uploaded.')

在 AI Platform Training 上训练流水线

使用 gcloud 向 AI Platform Training 提交训练作业。以下命令会封装您的训练应用,将训练应用上传到 Cloud Storage,然后告知 AI Platform Training 运行您的训练模块。

-- 参数是分隔符:AI Platform Training 服务不使用分隔符后面的参数,但您的训练模块仍然可以访问这些参数。

gcloud ai-platform jobs submit training census_training_$(date +"%Y%m%d_%H%M%S") \
  --job-dir gs://$BUCKET_NAME/custom_pipeline_tutorial/job \
  --package-path ./census_package \
  --module-name census_package.train \
  --region $REGION \
  --runtime-version 1.13 \
  --python-version 3.5 \
  --scale-tier BASIC \
  --stream-logs \
  -- \
  --gcs_data_path gs://cloud-samples-data/ai-platform/census/data/adult.data.csv \
  --gcs_model_path gs://$BUCKET_NAME/custom_pipeline_tutorial/model/model.joblib

部署流水线并执行预测

为了从 AI Platform Prediction 执行预测,您必须部署模型资源和版本资源。如果您多次修改和训练流水线,模型可帮助您组织多次部署。版本使用经过训练的模型和自定义代码来提供预测服务。

要部署这些资源,您需要提供两个工件:

  • 包含经过训练的流水线的 Cloud Storage 目录。上一步骤中的训练作业在将 model.joblib 导出到您的存储分区时创建了该文件。
  • Cloud Storage 中的 .tar.gz 源分发软件包,其中包含您的流水线使用的任何自定义转换器。在下一步骤中创建该软件包。

封装您的自定义转换器

如果您部署版本但不提供 my_pipeline.py 中的代码,则 AI Platform Prediction 将无法导入自定义转换器(例如 mp.SimpleOneHotEncoder),并且将无法提供预测服务。

创建以下 setup.py,为您的代码定义源代码分发软件包:

import setuptools
setuptools.setup(name='census_package',
      packages=['census_package'],
      version="1.0",
      )

然后,运行以下命令以创建 dist/census_package-1.0.tar.gz

python setup.py sdist --formats=gztar

最后,将该 tar 压缩文件上传到您的 Cloud Storage 存储分区:

gcloud storage cp ./dist/census_package-1.0.tar.gz gs://$BUCKET_NAME/custom_pipeline_tutorial/code/census_package-1.0.tar.gz

创建模型和版本资源

首先,定义模型和版本名称:

MODEL_NAME='CensusPredictor'
VERSION_NAME='v1'

然后,使用以下命令创建模型资源:

gcloud ai-platform models create $MODEL_NAME \
  --regions $REGION

最后,提供指向您的模型目录(包含 model.joblib 的目录)和自定义代码 (census_package-1.0.tar.gz) 的 Cloud Storage 路径,从而创建版本资源:

gcloud components install beta

gcloud beta ai-platform versions create $VERSION_NAME --model $MODEL_NAME \
  --origin gs://$BUCKET_NAME/custom_pipeline_tutorial/model/ \
  --runtime-version 1.13 \
  --python-version 3.5 \
  --framework SCIKIT_LEARN \
  --package-uris gs://$BUCKET_NAME/custom_pipeline_tutorial/code/census_package-1.0.tar.gz

提供在线预测服务

通过发送在线预测请求试用您的部署。首先,安装 Python 版 Google API 客户端库:

pip install --upgrade google-api-python-client

然后,将人口普查数据的两个实例发送到您已部署的版本:

import googleapiclient.discovery

instances = [
  [39, 'State-gov', 77516, ' Bachelors .  ', 13, 'Never-married', 'Adm-clerical', 'Not-in-family',
   'White', 'Male', 2174, 0, 40, 'United-States', '<=50K'],
  [50, 'Self-emp-not-inc', 83311, 'Bachelors', 13, 'Married-civ-spouse', 'Exec-managerial', 'Husband',
   'White', 'Male', 0, 0, 13, 'United-States', '<=50K']
]

service = googleapiclient.discovery.build('ml', 'v1')
name = 'projects/{}/models/{}/versions/{}'.format(PROJECT_ID, MODEL_NAME, VERSION_NAME)

response = service.projects().predict(
    name=name,
    body={'instances': instances}
).execute()

if 'error' in response:
    raise RuntimeError(response['error'])
else:
  print(response['predictions'])

版本通过经过训练的流水线传递输入数据,并返回分类器的结果:每个实例 <=50K>50K,具体取决于其对此人收入等级的预测。

清理

如需清理此项目中使用的所有 Google Cloud 资源,您可以删除用于本教程的 Google Cloud 项目

您也可以运行以下命令,清理各个资源:

# Delete version resource
gcloud ai-platform versions delete $VERSION_NAME --quiet --model $MODEL_NAME

# Delete model resource
gcloud ai-platform models delete $MODEL_NAME --quiet

# Delete Cloud Storage objects that were created
gcloud storage rm gs://$BUCKET_NAME/custom_pipeline_tutorial --recursive

后续步骤