在 Dataproc 中啟用 Spark 資料系譜

本文說明如何在專案叢集層級,為 Dataproc Spark 工作啟用資料沿襲

資料歷程Dataplex Universal Catalog 的功能,可讓您追蹤資料在系統之間的移動情形,包括來源、傳遞目的地和採用的轉換機制。

資料沿襲適用於所有 Dataproc Spark 工作 (SparkR 和 Spark 串流工作除外),並支援 BigQuery 和 Cloud Storage 資料來源。Dataproc on Compute Engine 2.0.74 以上、2.1.22 以上、2.2.50 以上的映像檔版本均內含此功能。

在 Dataproc 叢集中啟用這項功能後,Dataproc Spark 工作就會擷取資料歷程事件,並發布至 Dataplex Universal Catalog Data Lineage API。Dataproc 會透過 OpenLineage 與 Data Lineage API 整合,並使用 OpenLineage Spark 外掛程式

您可以透過 Dataplex Universal Catalog 存取資料歷程資訊,方法如下:

事前準備

  1. 在 Google Cloud 控制台的專案選擇器頁面中,選取包含要追蹤沿襲的 Dataproc 叢集的專案。

    前往專案選取器

  2. 啟用 Data Lineage API。

    啟用 API

必要的角色

如果您使用預設 VM 服務帳戶建立 Dataproc 叢集,該叢集會具備 Dataproc Worker 角色,因此可啟用資料沿襲功能。你不需要採取其他動作。

不過,如果您建立的 Dataproc 叢集使用自訂服務帳戶,則必須將必要角色授予自訂服務帳戶,才能在叢集上啟用資料沿襲功能,詳情請參閱下段說明。

如要取得使用 Dataproc 資料沿襲所需的權限,請要求管理員在叢集的自訂服務帳戶中,授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

在專案層級啟用 Spark 資料歷程

您可以在專案層級啟用 Spark 資料沿襲功能。如果是在專案啟用資料沿襲功能後建立的叢集上執行支援的 Spark 工作,系統就會啟用資料沿襲功能。請注意,在啟用專案層級的資料沿襲功能前建立的叢集,其執行工作不會啟用資料沿襲功能。

在專案層級啟用 Spark 資料歷程

如要在專案層級啟用 Spark 資料沿襲,請設定下列自訂專案中繼資料

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

如要在專案層級停用 Spark 資料沿襲,請將 DATAPROC_LINEAGE_ENABLED 中繼資料設為 false

在叢集層級啟用 Spark 資料歷程

建立叢集時,您可以啟用 Spark 資料歷程,這樣提交至叢集的所有支援 Spark 工作都會啟用資料歷程。

在叢集層級啟用 Spark 資料歷程

如要在叢集上啟用 Spark 資料沿襲,請建立 Dataproc 叢集,並將 dataproc:dataproc.lineage.enabled 叢集屬性設為 true

2.0 版映像檔叢集:需要 Dataproc 叢集 VM 存取權cloud-platform範圍,才能使用 Spark 資料沿襲功能。使用 2.1 以上版本建立的 Dataproc 映像檔版本叢集,會啟用 cloud-platform。如果您在建立叢集時指定 Dataproc 映像檔版本 2.0,請將範圍設為 cloud-platform

gcloud CLI 範例:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

在工作中停用 Spark 資料歷程

如果在叢集層級啟用 Spark 資料沿襲,提交工作時傳遞具有空值 ("") 的 spark.extraListeners 屬性,即可停用特定工作的 Spark 資料沿襲。

啟用後,您就無法在叢集上停用 Spark 資料沿襲。如要清除所有叢集作業的 Spark 資料沿襲,可以重新建立叢集,但不要使用 dataproc:dataproc.lineage.enabled 屬性。

提交 Spark 工作

在啟用 Spark 資料沿襲功能後建立的 Dataproc 叢集上提交 Spark 工作時,Dataproc 會擷取資料沿襲資訊,並向 Data Lineage API 回報。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

注意:

  • 新增 spark.openlineage.namespacespark.openlineage.appName 屬性 (用於識別工作) 是選用步驟。如未新增這些屬性,Dataproc 會使用下列預設值:
    • spark.openlineage.namespace 的預設值:PROJECT_ID
    • spark.openlineage.appName 的預設值:spark.app.name

在 Dataplex Universal Catalog 中查看歷程

歷程圖會顯示專案資源之間的關係,以及建立這些資源的程序。您可以在 Google Cloud 控制台中查看資料沿襲資訊,也可以從 Data Lineage API 擷取 JSON 格式的資料沿襲資訊。

PySpark 程式碼範例:

下列 PySpark 工作會從公開 BigQuery 資料表讀取資料,然後將輸出內容寫入現有 BigQuery 資料集的新資料表。並使用 Cloud Storage bucket 做為暫時儲存空間。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

請將下列項目改為對應的值:

  • BUCKET:現有 Cloud Storage bucket 的名稱。

  • PROJECT_IDDATASETTABLE:插入您的專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)。

您可以在 Dataplex Universal Catalog UI 中查看歷程圖。

歷程圖範例

後續步驟