在 Dataproc 中启用 Spark 数据沿袭

本文档介绍了如何在项目集群级层为 Dataproc Spark 作业启用数据沿袭

数据沿袭Dataplex Universal Catalog 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。

数据沿袭适用于所有 Dataproc Spark 作业(SparkR 和 Spark 流式作业除外),并支持 BigQuery 和 Cloud Storage 数据源。此功能包含在 Dataproc on Compute Engine 2.0.74+、2.1.22+、2.2.50 及更高映像版本中。

在 Dataproc 集群中启用此功能后,Dataproc Spark 作业会捕获数据沿袭事件并将其发布到 Dataplex Universal Catalog Data Lineage API。Dataproc 使用 OpenLineage Spark 插件通过 OpenLineage 与 Data Lineage API 集成。

您可以通过 Dataplex Universal Catalog 使用以下方式访问数据沿袭信息:

准备工作

  1. 在 Google Cloud 控制台中的项目选择器页面上,选择包含您要跟踪其沿袭的 Dataproc 集群的项目。

    转到“项目选择器”

  2. 启用 Data Lineage API。

    启用 API

所需的角色

如果您使用默认的虚拟机服务账号创建 Dataproc 集群,该账号将具有 Dataproc Worker 角色,从而启用数据沿袭。除此之外没有其他要求。

不过,如果您创建的 Dataproc 集群使用自定义服务账号,则必须按照下段所述向该自定义服务账号授予所需角色,才能在该集群上启用数据沿袭。

如需获得将数据沿袭与 Dataproc 搭配使用所需的权限,请让您的管理员为您授予集群自定义服务账号的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

在项目级层启用 Spark 数据沿袭

您可以在项目级层启用 Spark 数据沿袭。为项目启用数据沿袭后创建的集群上运行的受支持的 Spark 作业会启用数据沿袭。在现有集群(在项目级层启用数据沿袭之前创建的集群)上运行的作业不会启用数据沿袭。

在项目级层启用 Spark 数据沿袭

如需在项目级层启用 Spark 数据沿袭,请设置以下自定义项目元数据

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

您可以通过将 DATAPROC_LINEAGE_ENABLED 元数据设置为 false 在项目级层停用 Spark 数据沿袭。

在集群级层启用 Spark 数据沿袭

您可以在创建集群时启用 Spark 数据沿袭,以便向集群提交的所有受支持的 Spark 作业都会启用数据沿袭。

在集群级层启用 Spark 数据沿袭

如需在集群上启用 Spark 数据沿袭,请创建 Dataproc 集群,并将 dataproc:dataproc.lineage.enabled 集群属性设置为 true

2.0 映像版本集群:Spark 数据沿袭需要 Dataproc 集群虚拟机 cloud-platform 访问范围。使用映像版本 2.1 及更高版本创建的 Dataproc 映像版本集群已启用 cloud-platform。如果您在创建集群时指定 Dataproc 映像版本 2.0,请将范围设置为 cloud-platform

gcloud CLI 示例

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

在作业上停用 Spark 数据沿袭

如果您在集群级层启用了 Spark 数据沿袭,则可以在提交作业时传递值为空 ("") 的 spark.extraListeners 属性,从而在特定作业上停用 Spark 数据沿袭。

启用后,您无法在集群上停用 Spark 数据沿袭。如需在所有集群作业上消除 Spark 数据沿袭,您可以重新创建集群,但不使用 dataproc:dataproc.lineage.enabled 属性。

提交 Spark 作业

当您在启用了 Spark 数据沿袭的 Dataproc 集群上提交 Spark 作业时,Dataproc 会捕获数据沿袭信息并将其报告给 Data Lineage API。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

注意:

  • 添加用于唯一标识作业的 spark.openlineage.namespacespark.openlineage.appName 属性是可选的。如果您不添加这些属性,Dataproc 将使用以下默认值:
    • spark.openlineage.namespace 的默认值:PROJECT_ID
    • spark.openlineage.appName 的默认值:spark.app.name

在 Dataplex Universal Catalog 中查看沿袭

沿袭图显示了项目资源与创建这些资源的进程之间的关系。您可以在 Google Cloud 控制台中查看数据沿袭信息,也可以从 Data Lineage API 中以 JSON 数据的形式检索该信息。

PySpark 示例代码:

以下 PySpark 作业从公共 BigQuery 表中读取数据,然后将输出写入现有 BigQuery 数据集中的新表。它使用 Cloud Storage 存储桶进行临时存储。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

进行以下替换:

  • BUCKET:现有 Cloud Storage 存储桶的名称。

  • PROJECT_IDDATASETTABLE:插入您的项目 ID、现有 BigQuery 数据集的名称,以及要在该数据集中创建的新表的名称(该表不得存在)。

您可以在 Dataplex Universal Catalog 界面中查看沿袭图。

沿袭图示例

后续步骤