创建、上传和使用流水线模板

流水线模板是一种资源,可用于发布工作流定义,以供单个用户或多个用户多次重复使用。

Kubeflow Pipelines SDK 注册表客户端是一个新的客户端界面,您可将其与兼容的注册表服务器(如 Artifact Registry)搭配使用,以便对 Kubeflow Pipelines (KFP) 模板进行版本控制。如需了解详情,请参阅在 Kubeflow Pipelines SDK 注册表客户端中使用模板

本页介绍如何执行以下操作:

  • 创建 KFP 流水线模板
  • 使用 Kubeflow Pipelines SDK 注册表客户端将模板上传到流水线模板代码库
  • 在 Kubeflow Pipelines 客户端中使用模板

准备工作

在构建和运行流水线之前,请按照以下说明在 Google Cloud 控制台中设置 Google Cloud 项目和开发环境。

  1. 安装 Kubeflow Pipelines SDK v2 或更高版本。

    pip install --upgrade kfp>=2,<3
    
  1. 安装 Python 版 Vertex AI SDK v1.15.0 或更高版本。
    (可选)在安装之前,请运行以下命令以查看当前安装的 Python 版 Vertex AI SDK 版本:

      pip freeze | grep google-cloud-aiplatform
    
  2. (可选)安装 390.0.0 或更高版本的 Google Cloud CLI

  3. 启用 Artifact Registry API

配置权限

如果您还没有为 Vertex AI Pipelines 设置 gcloud CLI 项目,请按照为 Vertex AI Pipelines 配置 Google Cloud 项目中的说明进行操作。

此外,请分配以下预定义的 Identity and Access Management 权限,以将 Artifact Registry 用作模板注册表:

  • roles/artifactregistry.admin:分配此角色可创建和管理制品库。
  • roles/artifactregistry.repoAdminroles/artifactregistry.writer:分配其中任一角色可管理制品库内的模板。
  • roles/artifactregistry.reader:分配此角色可从制品库下载模板。
  • roles/artifactregistry.reader:将此角色分配给与 Vertex AI Pipelines 关联的服务账号可基于模板创建流水线运行作业。

如需详细了解 Artifact Registry 的预定义 Identity and Access Management 角色,请参阅 Artifact Registry 的预定义角色

使用以下示例分配角色:

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member=PRINCIPAL \
    --role=ROLE

请替换以下内容:

  • PROJECT_ID:您要在其中创建流水线的项目。
  • PRINCIPAL:您要向其添加权限的主账号。
  • ROLE:您要向主账号授予的 Identity and Access Management 角色。

如需详细了解以下内容,请参阅 Artifact Registry 文档中的角色和权限

在 Artifact Registry 中创建代码库

接下来,您需要在 Artifact Registry 中为流水线模板创建一个代码库。

控制台

  1. 在 Google Cloud 控制台中打开 Vertex AI Pipelines

    前往 Vertex AI Pipelines

  2. 点击您的模板标签页。

  3. 如需打开选择制品库窗格,请点击选择制品库

  4. 点击创建代码库

  5. 指定 quickstart-kfp-repo 作为该代码库的名称。

  6. 格式下,选择 Kubeflow Pipelines

  7. 位置类型下,选择区域

  8. 区域下拉列表中,选择 us-central1

  9. 点击创建

Google Cloud CLI

运行以下命令以创建代码库。

在使用下面的命令数据之前,请先进行以下替换:

  • LOCATION:创建代码库的位置或区域,例如 us-central1

执行 gcloud artifacts repositories create 命令:

Linux、macOS 或 Cloud Shell

gcloud artifacts repositories create quickstart-kfp-repo --location=LOCATION_ID --repository-format=KFP

Windows (PowerShell)

gcloud artifacts repositories create quickstart-kfp-repo --location=LOCATION_ID --repository-format=KFP

Windows (cmd.exe)

gcloud artifacts repositories create quickstart-kfp-repo --location=LOCATION_ID --repository-format=KFP
 

创建模板

使用以下代码示例定义具有单个组件的流水线。如需了解如何使用 KFP 定义流水线,请参阅构建流水线

from kfp import dsl
from kfp import compiler

@dsl.component()
def hello_world(text: str) -> str:
    print(text)
    return text

@dsl.pipeline(name='hello-world', description='A simple intro pipeline')
def pipeline_hello_world(text: str = 'hi there'):
    """Pipeline that passes small pipeline parameter string to consumer op."""

    consume_task = hello_world(
        text=text)  # Passing pipeline parameter as argument to consumer op

compiler.Compiler().compile(
    pipeline_func=pipeline_hello_world,
    package_path='hello_world_pipeline.yaml')

运行该示例时,compiler.Compiler().compile(...) 语句会将“hello-world”流水线编译为名为 hello_world_pipeline.yaml 的本地 YAML 文件。

上传模板

控制台

  1. 在 Google Cloud 控制台中打开 Vertex AI Pipelines

    前往 Vertex AI Pipelines

  2. 点击上传以打开上传流水线或组件窗格。

  3. 代码库下拉列表中,选择 quickstart-kfp-repo 代码库。

  4. 为流水线模板指定一个名称

  5. File(文件)字段中,点击 Choose(选择),从本地文件系统中选择并上传已编译的流水线模板 YAML。

  6. 上传流水线模板后,它会列在您的模板页面上。

    前往您的模板

Kubeflow Pipelines SDK 客户端

  1. 如需配置 Kubeflow Pipelines SDK 注册表客户端,请运行以下命令:

    from kfp.registry import RegistryClient
    
    client = RegistryClient(host=f"https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo")
    
  2. 将已编译的 YAML 文件上传到 Artifact Registry 中的代码库。

    templateName, versionName = client.upload_pipeline(
      file_name="hello_world_pipeline.yaml",
      tags=["v1", "latest"],
      extra_headers={"description":"This is an example pipeline template."})
    
  3. 如需验证模板是否已上传,请执行以下操作:

    1. 在 Google Cloud 控制台中打开 Vertex AI Pipelines

      前往 Vertex AI Pipelines

    2. 点击您的模板标签页。

    3. 点击选择制品库

    4. 从列表中选择 quickstart-kfp-repo 代码库,然后点击选择

    5. 您应该会在列表中找到上传的模板软件包 hello-world

    6. 如需查看流水线模板的版本列表,请点击 hello-world 模板。

    7. 如需查看流水线拓扑,请点击版本。

在 Vertex AI 中使用模板

将流水线模板上传到 Artifact Registry 中的代码库后,您就可以在 Vertex AI Pipelines 中使用它了。

为模板创建暂存存储桶

在使用流水线模板之前,您需要为暂存流水线运行创建一个 Cloud Storage 存储桶。

如需创建存储桶,请按照为流水线工件配置 Cloud Storage 存储桶中的说明操作,然后运行以下命令:

STAGING_BUCKET="gs://BUCKET_NAME"

BUCKET_NAME 替换为您刚刚创建的存储桶的名称。

基于模板创建流水线运行

您可以使用 Python 版 Vertex AI SDK 或 Google Cloud 控制台通过 Artifact Registry 中的模板创建流水线运行。

控制台

  1. 在 Google Cloud 控制台中打开 Vertex AI Pipelines

    前往 Vertex AI Pipelines

  2. 点击您的模板标签页。

  3. 如需打开选择制品库窗格,请点击选择制品库

  4. 选择 quickstart-kfp-repo 制品库,然后点击选择

  5. 点击 hello-world 软件包。

  6. 4f245e8f9605 版本旁边,点击创建运行作业

  7. 点击运行时配置

  8. Cloud Storage 位置下,输入以下内容:

    gs://BUCKET_NAME
    

    BUCKET_NAME 替换为您为暂存流水线运行创建的存储桶的名称。

  9. 点击提交

Python 版 Vertex AI SDK

使用以下示例基于模板创建流水线运行:

from google.cloud import aiplatform

# Initialize the aiplatform package
aiplatform.init(
    project="PROJECT_ID",
    location='us-central1',
    staging_bucket=STAGING_BUCKET)

# Create a pipeline job using a version ID.
job = aiplatform.PipelineJob(
    display_name="hello-world-latest",
    template_path="https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world@SHA256_TAG" + \
        versionName)

# Alternatively, create a pipeline job using a tag.
job = aiplatform.PipelineJob(
    display_name="hello-world-latest",
    template_path="https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world/TAG")

job.submit()

请替换以下内容:

  • PROJECT_ID:在其中运行此流水线的 Google Cloud 项目。

  • SHA256_TAG:模板版本的 sha256 哈希值。

  • TAG:模板的版本标记。

查看已创建的流水线运行

您可以在 Python 版 Vertex AI SDK 中查看由特定流水线版本创建的运行作业。

控制台

  1. 在 Google Cloud 控制台中打开 Vertex AI Pipelines

    前往 Vertex AI Pipelines

  2. 点击您的模板标签页。

  3. 点击选择制品库

  4. 从列表中选择 quickstart-kfp-repo 代码库,然后点击选择

  5. 如需查看 hello-world 流水线模板的版本列表,请点击 hello world 模板。

  6. 点击要查看其流水线运行的所需版本。

  7. 如需查看所选版本的流水线运行,请点击查看运行,然后点击运行标签页。

Python 版 Vertex AI SDK

如需列出流水线运行,请运行 pipelineJobs.list 命令,如以下一个或多个示例所示:

  from google.cloud import aiplatform

  # To filter all runs created from a specific version
  filter = 'template_uri:"https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world/*" AND ' + \
           'template_metadata.version="%s"' % versionName
  aiplatform.PipelineJob.list(filter=filter)

  # To filter all runs created from a specific version tag
  filter = 'template_uri="https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world/latest"'
  aiplatform.PipelineJob.list(filter=filter)

  # To filter all runs created from a package
  filter = 'template_uri:"https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/hello-world/*"'
  aiplatform.PipelineJob.list(filter=filter)

  # To filter all runs created from a repo
  filter = 'template_uri:"https://us-central1-kfp.pkg.dev/PROJECT_ID/quickstart-kfp-repo/*"'
  aiplatform.PipelineJob.list(filter=filter)

在 Kubeflow Pipelines SDK 注册表客户端中使用模板

您可以将 Kubeflow Pipelines SDK 注册表客户端与 Artifact Registry 结合使用来下载和使用流水线模板。

  • 如需列出代码库中的资源,请运行以下命令:

    templatePackages = client.list_packages()
    templatePackage = client.get_package(package_name = "hello-world")
    
    versions = client.list_versions(package_name="hello-world")
    version = client.get_version(package_name="hello-world", version=versionName)
    
    tags = client.list_tags(package_name = "hello-world")
    tag = client.get_tag(package_name = "hello-world", tag="latest")
    

    如需查看可用方法和文档的完整列表,请参阅 Artifact Registry GitHub 代码库中的 proto 文件。

  • 如需将模板下载到本地文件系统,请运行以下命令:

    # Sample 1
    filename = client.download_pipeline(
      package_name = "hello-world",
      version = versionName)
    # Sample 2
    filename = client.download_pipeline(
      package_name = "hello-world",
      tag = "v1")
    # Sample 3
    filename = client.download_pipeline(
      package_name = "hello-world",
      tag = "v1",
      file_name = "hello-world-template.yaml")
    

使用 Artifact Registry REST API

以下部分总结了如何使用 Artifact Registry REST API 来管理 Artifact Registry 代码库中的流水线模板。

使用 Artifact Registry REST API 上传流水线模板

您可以使用本部分所述的参数值创建 HTTP 请求来上传流水线模板,其中:

  • PROJECT_ID 是在其中运行此流水线的 Google Cloud 项目。
  • REPO_ID 是您的 Artifact Registry 代码库的 ID。

curl 请求示例

curl -X POST \
    -H "Authorization: Bearer $(gcloud auth print-access-token)" \
    -F tags=v1,latest \
    -F content=@pipeline_spec.yaml \
    https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID

构建上传请求

该请求可以是 HTTP 或 HTTPS 分段请求;必须在请求标头中包含身份验证令牌。如需了解详情,请参阅 gcloud auth print-access-token

请求的载荷是 pipeline_spec.yaml 文件(或 .zip 软件包)的内容。建议的大小上限为 10 MiB。

软件包名称取自 pipeline_spec.yaml 文件中的 pipeline_spec.pipeline_info.name 条目。软件包名称唯一标识软件包,并且在各个版本中不可变。长度在 4 到 128 个字符之间,并且必须与以下正则表达式匹配:^[a-z0-9][a-z0-9-]{3,127}$

软件包 tags 是一个最多包含 8 个以英文逗号分隔的标记的列表。每个标记必须与以下正则表达式匹配:^[a-zA-Z0-9\-._~:@+]{1,128}$

如果标记存在并指向已上传的流水线,则该标记将更新为指向您当前正在上传的流水线。例如,如果 latest 标记指向您已上传的流水线,并且您使用 --tag=latest 上传新版本,则系统会从之前上传的流水线移除 latest 标记并将其分配给您正在上传的新流水线。

如果您上传的流水线与之前上传的流水线相同,则上传成功。系统会更新所上传流水线的元数据(包括其版本标记),以与上传请求的参数值匹配。

上传响应

如果上传请求成功,则返回 HTTP OK 状态。响应的正文如下所示:

{packageName}/{versionName=sha256:abcdef123456...}

其中 versionName 是以十六进制字符串表示的 pipeline_spec.yaml 的 sha256 摘要。

使用 Artifact Registry REST API 下载流水线模板

您可以使用本部分所述的参数值创建 HTTP 请求来下载流水线模板,其中:

  • PROJECT_ID 是在其中运行此流水线的 Google Cloud 项目。
  • REPO_ID 是您的 Artifact Registry 代码库的 ID。
  • PACKAGE_ID 是您上传的模板的软件包 ID。
  • TAG 是版本标记。
  • VERSION 是模板版本,格式为 sha256:abcdef123456...

对于标准 Artifact Registry 下载,您应按如下方式创建下载链接:

url = https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID/PACKAGE_ID/VERSION
url = https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID/PACKAGE_ID/TAG

curl 请求示例

curl -X GET \
    -H "Authorization: Bearer $(gcloud auth print-access-token)" \
    https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID/PACKAGE_ID/VERSION

您可以将 VERSION 替换为 TAG 并下载同一模板,如以下示例所示:

curl -X GET \
    -H "Authorization: Bearer $(gcloud auth print-access-token)" \
    https://us-central1-kfp.pkg.dev/PROJECT_ID/REPO_ID/PACKAGE_ID/TAG

下载响应

如果下载请求成功,则返回 HTTP OK 状态。响应的正文是 pipeline_spec.yaml 文件的内容。