Dataplex Universal Catalog 支援排定自訂程式碼執行作業,可一次性執行、定期執行或按需執行。隨選功能目前處於預覽階段,且只能透過 API 使用。您可以使用 Spark (Java)、PySpark (僅限 Spark 3.2 版) 或 Spark SQL 排定客戶資料轉換作業。Dataplex 通用目錄會使用無伺服器 Spark 處理程序和內建的無伺服器排程器執行程式碼。
術語
- 工作
- Dataplex Universal Catalog 工作代表您希望 Dataplex Universal Catalog 依時執行的工作。它會封裝您的程式碼、參數和排程。
- 工作
工作代表 Dataplex 通用目錄任務的單次執行作業。舉例來說,如果工作排定每天執行,Dataplex Universal Catalog 就會每天建立工作。
如果工作是在 2023 年 5 月 10 日當天或之後建立,則「觸發條件」欄位會顯示工作的執行觸發條件類型。
以下是工作執行觸發事件類型:
RUN_REQUEST:表示因呼叫
RunTask
API 而執行工作。TASK_CONFIG:表示工作是因工作
TriggerSpec
設定而執行。
排程模式
Dataplex Universal Catalog 支援下列排程模式:
- 執行一次
- 使用這個模式只執行一次工作。您可以選擇立即執行,或在日後的特定時間執行。即使立即執行工作,執行作業仍可能需要最多兩分鐘的時間才能開始。
- 依排程執行
- 使用這個模式,以重複頻率執行工作。支援的重複週期包括每日、每週、每月或自訂。
- 隨選執行
使用這個模式,可視需要執行先前建立的工作。只有
RunTask
API 支援按需執行模式。當工作按需執行時,Dataplex Universal Catalog 會使用現有參數建立工作。您可以指定ExecutionSpec
引數和標籤來執行工作。
事前準備
啟用 Dataproc API。
為網路和子網路啟用私人 Google 存取權。在搭配 Dataplex 通用目錄工作使用的網路上啟用 Private Google Access。如果您在建立 Dataplex 通用目錄工作時未指定網路或子網路,Dataplex 通用目錄會使用預設子網路,您必須為預設子網路啟用私人 Google 存取權。
建立服務帳戶。如要排定任何 Dataplex 通用目錄工作,必須使用服務帳戶。服務帳戶必須隸屬於執行工作任務的專案。服務帳戶必須具備下列權限:
存取正在處理的 BigQuery 和 Cloud Storage 資料。
您執行工作所在專案的 Dataproc 工作站角色權限。
如果工作需要讀取或更新連結至資料湖的 Dataproc Metastore 執行個體,服務帳戶就需要具備 Dataproc Metastore 檢視者或編輯者角色。您必須在設定 Dataplex 通用目錄資料湖的專案中,授予這個角色。
如果工作是 Spark SQL 工作,您必須授予服務帳戶 Dataplex 通用目錄開發人員角色。您必須在設定 Dataplex 通用目錄湖的專案中授予這個角色。
如果工作是 Spark SQL 工作,您需要在寫入結果的值區中具備 Cloud Storage 管理員權限。
如要排程及執行 Spark SQL 和自訂 Spark 工作,您必須在服務帳戶中獲得 Dataplex 通用目錄中繼資料讀取者 (
roles/dataplex.metadataReader
)、Dataplex 通用目錄檢視者 (roles/dataplex.viewer
) 和 Dataproc Metastore 中繼資料使用者 (roles/metastore.metadataUser
) 的 IAM 角色。
將服務帳戶的「服務帳戶使用者」角色 (
roles/iam.serviceAccountUser
) 授予提交工作內容的使用者。如需操作說明,請參閱「管理服務帳戶的存取權」。授予 Dataplex 通用目錄湖服務帳戶使用服務帳戶的權限。您可以在 Google Cloud 控制台的「湖泊詳細資料」頁面中,找到 Dataplex 通用目錄湖泊服務帳戶。
如果包含 Dataplex 通用目錄湖的專案與要執行工作的工作專案不同,請在執行工作的工作專案中,將 Dataproc 編輯者角色授予 Dataplex 通用目錄湖服務帳戶。
將必要的程式碼構件 (JAR、Python 或 SQL 指令碼檔案) 或封存檔案 (
.jar
、.tar
、.tar.gz
、.tgz
、.zip
) 放在 Cloud Storage 路徑中。請確認服務帳戶具備存放這些程式碼構件的 Cloud Storage 值區的必要
storage.objects.get
權限。
安排 Spark (Java 或 Python) 任務
控制台
在 Google Cloud 控制台中,前往 Dataplex 通用目錄頁面。
前往「Process」檢視畫面。
按一下「建立工作」。
針對「Create Custom Spark Task」(建立自訂 Spark 工作),按一下「Create task」(建立工作)。
選擇 Dataplex 通用目錄資料湖。
提供任務名稱。
為工作建立ID。
在「工作設定」部分的「類型」中,選取「Spark」或「PySpark」。
輸入相關引數。
在「Service account」欄位中,輸入自訂 Spark 工作可執行的使用者服務帳戶。
按一下「繼續」。
選用:設定排程:選取「Run once」或「Repeat」。填寫必填欄位。
按一下「繼續」。
選用步驟:自訂資源和新增其他設定。
按一下 [建立]。
gcloud
您可以使用 gcloud CLI 指令排定 Spark (Java / Python) 工作。下表列出必須使用的參數和選用參數:
參數 | 說明 |
---|---|
--lake |
Dataplex Universal Catalog 服務的湖泊資源湖泊 ID。 |
--location |
Dataplex 通用目錄服務的位置。 |
--spark-main-class |
驅動程式的主類別。包含類別的 jar 檔案必須位於預設 CLASSPATH 中。 |
--spark-main-jar-file-uri |
包含主要類別的 jar 檔案的 Cloud Storage URI。 |
--spark-archive-uris |
選用:要擷取至各執行程式工作目錄的封存檔 Cloud Storage URI。支援的檔案類型:.jar 、.tar 、.tar.gz 、.tgz 和 .zip 。 |
--spark-file-uris |
選用:要遷入各項執行程式工作目錄的檔案 Cloud Storage URI。 |
--batch-executors-count |
選用:工作執行器的總數。預設值為 2。 |
--batch-max-executors-count |
選用:可設定執行緒的數量上限。預設值為 1000。如果 batch-max-executors-count 大於 batch-executors-count ,則 Dataplex 通用目錄會啟用自動調整大小功能。 |
--container-image-java-jars |
選用:要加入至 classpath 的 Java JARS 清單。有效的輸入內容包括 Jar 二進位檔的 Cloud Storage URI。 例如 gs://bucket-name/my/path/to/file.jar 。 |
--container-image-properties |
選用:以 prefix:property 格式指定的屬性鍵。例如 core:hadoop.tmp.dir 。詳情請參閱「叢集屬性」。 |
--vpc-network-tags |
選用:要套用至工作中的網路標記清單。 |
--vpc-network-name |
選用:工作執行的虛擬私有雲網路。根據預設,Dataplex 通用目錄會使用專案中名為 Default 的虛擬私有雲網路。請務必只使用 --vpc-network-name 或 --vpc-sub-network-name 其中一個。 |
--vpc-sub-network-name |
選用:工作執行的 VPC 子網路。 您只能使用 --vpc-sub-network-name 或 --vpc-network-name 其中一個。 |
--trigger-type |
使用者指定工作的觸發條件類型。值必須是下列其中一個:ON_DEMAND - 任務建立後不久就會執行一次。RECURRING - 工作會依排程定期執行。 |
--trigger-start-time |
選填:工作首次執行的時間。格式為 `{year}-{month}-{day}T{hour}:{min}:{sec}Z`,其中時區為 UTC。舉例來說,「2017-01-15T01:30:00Z」會將 2017 年 1 月 15 日 01:30 世界標準時間編碼為 如果未指定這個值,則觸發條件類型為 ON_DEMAND 時,工作會在提交後執行;如果觸發條件類型為 RECURRING ,則會在指定的時間表上執行。 |
--trigger-disabled |
選用:可防止工作執行。這個參數不會取消已在執行的工作,而是暫時停用 RECURRING 工作。 |
--trigger-max-retires |
選用:取消前重試的次數。將值設為零,即可一律不嘗試重試失敗的工作。 |
--trigger-schedule |
Cron 時程:用於定期執行工作。 |
--description |
選用:工作說明。 |
--display-name |
選用:工作顯示名稱。 |
--labels |
選用:要新增的標籤 KEY=VALUE 組合清單。 |
--execution-args |
選用:要傳遞至工作中的引數。引數可以是鍵/值組合的混合。您可以將以半形逗號分隔的鍵/值組合清單,做為執行引數傳遞。如要傳遞位置引數,請將鍵設為 TASK_ARGS ,並將值設為以半形逗號分隔的字串,內含所有位置引數。如要使用半形逗號以外的界限符號,請參閱轉義。如果 key-value 和位置引數一併傳遞,則 TASK_ARGS 會做為最後一個引數傳遞。 |
--execution-service-account |
用於執行工作的服務帳戶。 |
--max-job-execution-lifetime |
選用:工作執行作業到期前的時間上限。 |
--container-image |
選用:工作執行階段環境的自訂容器映像檔。如未指定,系統會使用預設容器映像檔。 |
--kms-key |
選用:用於加密的 Cloud KMS 金鑰,格式如下:projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name}
|
Java 範例:
glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND –spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>
PySpark 範例:
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>
REST
如要建立工作,請使用 API Explorer。
排定 Spark SQL 任務
gcloud
如要排定 Spark SQL 任務,請執行與排定 Spark (Java 或 Python) 任務相同的 gcloud CLI 指令,並使用下列額外參數:
參數 | 說明 |
---|---|
--spark-sql-script |
SQL 查詢文字。您必須使用 spark-sql-script 或 spark-sql-script-file 其中之一。 |
--spark-sql-script-file |
查詢檔案的參照。這個值可以是查詢檔案的 Cloud Storage URI,或 SQL 指令碼內容的路徑。您必須使用 spark-sql-script 或 spark-sql-script-file 其中之一。 |
--execution-args |
對於 Spark SQL 工作,下列引數為必填項目,且需要以位置引數傳遞:--output_location, <GCS uri of the output directory> --output_format, <output file format> 。支援的格式包括 CSV 檔案、JSON 檔案、Parquet 和 ORC。 |
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>
REST
如要建立工作,請使用 API Explorer。
監控工作
控制台
在 Google Cloud 控制台中,前往 Dataplex 通用目錄頁面:
前往「Process」檢視畫面。
「Tasks」 分頁會顯示工作清單,並依工作範本類型篩選。
在「名稱」欄中,按一下要查看的任務。
按一下要查看的工作的「Job ID」。
系統會在Google Cloud 控制台中開啟 Dataproc 頁面,讓您查看監控和輸出詳細資料。
gcloud
下表列出用於監控工作內容的 gcloud CLI 指令。
動作 | gcloud CLI 指令 |
---|---|
列出工作 | gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id> |
查看工作詳細資料 | gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id> |
列出工作的工作 | gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> |
查看工作詳細資料 | gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id> |
Dataplex 通用目錄會在 Dataproc Serverless 上執行工作 (批次)。如要查看 Dataplex 通用目錄工作項目的執行記錄,請按照下列步驟操作:
取得 Dataproc Serverless (批次) 工作 ID。執行下列指令:
gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
查看記錄。使用執行上一個指令後取得的工作 ID,執行下列指令:
gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>
REST
管理時間表
您可以在 Google Cloud 主控台的 Dataplex 通用目錄中,編輯工作排程、刪除工作或取消進行中的工作。下表列出這些動作的 gcloud CLI 指令。
動作 | gcloud CLI 指令 |
---|---|
編輯工作排程 | gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id> |
刪除工作 | gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id> |
取消工作 | gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id> |
後續步驟
- 請參閱「Dataproc 範本」。
- 試試使用預先建立的範本,逐步將資料從 Dataplex 通用目錄 Cloud Storage 資產移至 BigQuery。
- 請參閱「設定 Dataplex 通用目錄工作專案的快訊和通知」。