Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何使用 Cloud Composer 2 在Google Cloud上執行 Dataproc 無伺服器服務工作負載。
以下各節的範例說明如何使用運算子管理 Dataproc Serverless 批次工作負載。您可以在建立、刪除、列出及取得 Dataproc Serverless Spark 批次工作負載的 DAG 中使用這些運算子:
為運算子建立 DAG,以便與 Dataproc Serverless Batch 批次工作負載搭配運作:
建立使用自訂容器和 Dataproc Metastore 的 DAG。
為這些 DAG 設定永久記錄伺服器。
事前準備
啟用 Dataproc API:
主控台
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
選取批次工作負載檔案的位置。您可以使用下列任一選項:
- 建立 Cloud Storage 值區來儲存這個檔案。
- 使用環境的值區。由於您不需要將此檔案與 Airflow 同步,因此可以在
/dags
或/data
資料夾外建立個別的子資料夾。例如:/batches
。 - 使用現有值區。
設定檔案和 Airflow 變數
本節將示範如何設定檔案和 Airflow 變數,以便進行本教學課程。
將 Dataproc Serverless Spark ML 工作負載檔案上傳至值區
本教學課程的工作負載會執行 pyspark 指令碼:
將任何 pyspark 指令碼儲存至名為
spark-job.py
的本機檔案。例如,您可以使用範例 pyspark 指令碼。
設定 Airflow 變數
以下各節的範例會使用 Airflow 變數。您可以在 Airflow 中為這些變數設定值,然後 DAG 程式碼就能存取這些值。
本教學課程的範例會使用下列 Airflow 變數。您可以視需要設定這些值,具體取決於您使用的範例。
設定下列 Airflow 變數,以便在 DAG 程式碼中使用:
project_id
:專案 ID。bucket_name
:工作負載 (spark-job.py
) 主要 Python 檔案所在的儲存桶 URI。您在事前準備中選取了這個位置。phs_cluster
:永久記錄伺服器叢集名稱。您可以在建立永久記錄伺服器時設定這個變數。image_name
:自訂容器映像檔 (image:tag
) 的名稱和標記。您在使用 DataprocCreateBatchOperator 時使用自訂容器映像檔時,請設定這個變數。metastore_cluster
:Dataproc Metastore 服務名稱。使用 DataprocCreateBatchOperator 使用 Dataproc Metastore 服務時,您會設定這個變數。region_name
:Dataproc Metastore 服務所在的區域。使用 DataprocCreateBatchOperator 使用 Dataproc Metastore 服務時,您會設定這個變數。
使用 Google Cloud 控制台和 Airflow 使用者介面設定各個 Airflow 變數
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境的「Airflow」連結。Airflow UI 會隨即開啟。
在 Airflow UI 中,依序選取「管理」>「變數」。
按一下「新增記錄」。
在「Key」欄位中指定變數名稱,並在「Val」欄位中設定變數值。
按一下 [儲存]。
建立永久記錄伺服器
使用永久記錄伺服器 (PHS) 查看批次工作負載的 Spark 記錄檔案:
- 建立永久記錄伺服器。
- 請確認您已在
phs_cluster
Airflow 變數中指定 PHS 叢集的名稱。
DataprocCreateBatchOperator
下列 DAG 會啟動 Dataproc Serverless Batch 工作負載。
如要進一步瞭解 DataprocCreateBatchOperator
引數,請參閱運算子的原始碼。
如要進一步瞭解可在 DataprocCreateBatchOperator
的 batch
參數中傳遞的屬性,請參閱 Batch 類別的說明。
搭配 DataprocCreateBatchOperator 使用自訂容器映像檔
以下範例說明如何使用自訂容器映像檔來執行工作負載。舉例來說,您可以使用自訂容器,新增預設容器映像檔未提供的 Python 依附元件。
如要使用自訂容器映像檔,請按照下列步驟操作:
在
image_name
Airflow 變數中指定圖片。搭配自訂映像檔使用 DataprocCreateBatchOperator:
搭配 DataprocCreateBatchOperator 使用 Dataproc Metastore 服務
如要在 DAG 中使用 Dataproc Metastore 服務,請按照下列步驟操作:
確認 Metastore 服務是否已啟動。
如要瞭解如何啟動中繼資料服務,請參閱「啟用及停用 Dataproc Metastore」。
如要進一步瞭解用於建立設定的批次運算子,請參閱 PeripheralsConfig。
當 Metastore 服務啟動並運作後,請在
metastore_cluster
變數中指定其名稱,並在region_name
Airflow 變數中指定其區域。在 DataprocCreateBatchOperator 中使用中繼存放區服務:
DataprocDeleteBatchOperator
您可以使用 DataprocDeleteBatchOperator,根據工作負載的批次 ID 刪除批次。
DataprocListBatchesOperator
DataprocDeleteBatchOperator 會列出指定 project_id 和區域中的批次。
DataprocGetBatchOperator
DataprocGetBatchOperator 會擷取特定批次工作負載。