本文說明如何在專案或叢集層級,為 Dataproc Spark 工作啟用資料沿襲。
資料歷程是 Dataplex Universal Catalog 的功能,可讓您追蹤資料在系統之間的移動情形,包括來源、傳遞目的地和採用的轉換機制。
資料沿襲適用於所有 Dataproc Spark 工作 (SparkR 和 Spark 串流工作除外),並支援 BigQuery 和 Cloud Storage 資料來源。Dataproc on Compute Engine 2.0.74 以上、2.1.22 以上、2.2.50 以上的映像檔版本均內含此功能。
在 Dataproc 叢集中啟用這項功能後,Dataproc Spark 工作就會擷取資料歷程事件,並發布至 Dataplex Universal Catalog Data Lineage API。Dataproc 會透過 OpenLineage 與 Data Lineage API 整合,並使用 OpenLineage Spark 外掛程式。
您可以透過 Dataplex Universal Catalog 存取資料歷程資訊,方法如下:
事前準備
必要的角色
如果您使用預設 VM 服務帳戶建立 Dataproc 叢集,該叢集會具備 Dataproc Worker
角色,因此可啟用資料沿襲功能。你不需要採取其他動作。
不過,如果您建立的 Dataproc 叢集使用自訂服務帳戶,則必須將必要角色授予自訂服務帳戶,才能在叢集上啟用資料沿襲功能,詳情請參閱下段說明。
如要取得使用 Dataproc 資料沿襲所需的權限,請要求管理員在叢集的自訂服務帳戶中,授予您下列 IAM 角色:
-
授予下列其中一個角色:
-
Dataproc Worker (
roles/dataproc.worker
) -
資料歷程編輯者 (
roles/datalineage.editor
) -
資料歷程記錄產生器 (
roles/datalineage.producer
) -
資料歷程管理員 (
roles/datalineage.admin
)
-
Dataproc Worker (
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
在專案層級啟用 Spark 資料歷程
您可以在專案層級啟用 Spark 資料沿襲功能。如果是在專案啟用資料沿襲功能後建立的叢集上執行支援的 Spark 工作,系統就會啟用資料沿襲功能。請注意,在啟用專案層級的資料沿襲功能前建立的叢集,其執行工作不會啟用資料沿襲功能。
在專案層級啟用 Spark 資料歷程
如要在專案層級啟用 Spark 資料沿襲,請設定下列自訂專案中繼資料:
鍵 | 值 |
---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
如要在專案層級停用 Spark 資料沿襲,請將 DATAPROC_LINEAGE_ENABLED
中繼資料設為 false
。
在叢集層級啟用 Spark 資料歷程
建立叢集時,您可以啟用 Spark 資料歷程,這樣提交至叢集的所有支援 Spark 工作都會啟用資料歷程。
在叢集層級啟用 Spark 資料歷程
如要在叢集上啟用 Spark 資料沿襲,請建立 Dataproc 叢集,並將 dataproc:dataproc.lineage.enabled
叢集屬性設為 true
。
2.0 版映像檔叢集:需要 Dataproc 叢集 VM 存取權cloud-platform
範圍,才能使用 Spark 資料沿襲功能。使用 2.1 以上版本建立的 Dataproc 映像檔版本叢集,會啟用 cloud-platform
。如果您在建立叢集時指定 Dataproc 映像檔版本 2.0
,請將範圍設為 cloud-platform
。
gcloud CLI 範例:
gcloud dataproc clusters create CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--properties 'dataproc:dataproc.lineage.enabled=true'
在工作中停用 Spark 資料歷程
如果在叢集層級啟用 Spark 資料沿襲,提交工作時傳遞具有空值 ("") 的 spark.extraListeners
屬性,即可停用特定工作的 Spark 資料沿襲。
啟用後,您就無法在叢集上停用 Spark 資料沿襲。如要清除所有叢集作業的 Spark 資料沿襲,可以重新建立叢集,但不要使用 dataproc:dataproc.lineage.enabled
屬性。
提交 Spark 工作
在啟用 Spark 資料沿襲功能後建立的 Dataproc 叢集上提交 Spark 工作時,Dataproc 會擷取資料沿襲資訊,並向 Data Lineage API 回報。
gcloud dataproc jobs submit spark \
--cluster=CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME
注意:
- 新增
spark.openlineage.namespace
和spark.openlineage.appName
屬性 (用於識別工作) 是選用步驟。如未新增這些屬性,Dataproc 會使用下列預設值:spark.openlineage.namespace
的預設值:PROJECT_IDspark.openlineage.appName
的預設值:spark.app.name
在 Dataplex Universal Catalog 中查看歷程
歷程圖會顯示專案資源之間的關係,以及建立這些資源的程序。您可以在 Google Cloud 控制台中查看資料沿襲資訊,也可以從 Data Lineage API 擷取 JSON 格式的資料沿襲資訊。
PySpark 程式碼範例:
下列 PySpark 工作會從公開 BigQuery 資料表讀取資料,然後將輸出內容寫入現有 BigQuery 資料集的新資料表。並使用 Cloud Storage bucket 做為暫時儲存空間。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
請將下列項目改為對應的值:
BUCKET:現有 Cloud Storage bucket 的名稱。
PROJECT_ID、DATASET 和 TABLE:插入您的專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)。
您可以在 Dataplex Universal Catalog UI 中查看歷程圖。
後續步驟
- 進一步瞭解資料歷程。