使用 Cloud Composer 執行 Dataproc Serverless 工作負載

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何使用 Cloud Composer 2 在Google Cloud上執行 Dataproc 無伺服器服務工作負載。

以下各節的範例說明如何使用運算子管理 Dataproc Serverless 批次工作負載。您可以在建立、刪除、列出及取得 Dataproc Serverless Spark 批次工作負載的 DAG 中使用這些運算子:

事前準備

  1. 啟用 Dataproc API:

    主控台

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. 選取批次工作負載檔案的位置。您可以使用下列任一選項:

    • 建立 Cloud Storage 值區來儲存這個檔案。
    • 使用環境的值區。由於您不需要將此檔案與 Airflow 同步,因此可以在 /dags/data 資料夾外建立個別的子資料夾。例如:/batches
    • 使用現有值區。

設定檔案和 Airflow 變數

本節將示範如何設定檔案和 Airflow 變數,以便進行本教學課程。

將 Dataproc Serverless Spark ML 工作負載檔案上傳至值區

本教學課程的工作負載會執行 pyspark 指令碼:

  1. 將任何 pyspark 指令碼儲存至名為 spark-job.py 的本機檔案。例如,您可以使用範例 pyspark 指令碼

  2. 將檔案上傳至您在開始前所選位置。

設定 Airflow 變數

以下各節的範例會使用 Airflow 變數。您可以在 Airflow 中為這些變數設定值,然後 DAG 程式碼就能存取這些值。

本教學課程的範例會使用下列 Airflow 變數。您可以視需要設定這些值,具體取決於您使用的範例。

設定下列 Airflow 變數,以便在 DAG 程式碼中使用:

  • project_id專案 ID
  • bucket_name:工作負載 (spark-job.py) 主要 Python 檔案所在的儲存桶 URI。您在事前準備中選取了這個位置。
  • phs_cluster:永久記錄伺服器叢集名稱。您可以在建立永久記錄伺服器時設定這個變數。
  • image_name:自訂容器映像檔 (image:tag) 的名稱和標記。您在使用 DataprocCreateBatchOperator 時使用自訂容器映像檔時,請設定這個變數。
  • metastore_cluster:Dataproc Metastore 服務名稱。使用 DataprocCreateBatchOperator 使用 Dataproc Metastore 服務時,您會設定這個變數。
  • region_name:Dataproc Metastore 服務所在的區域。使用 DataprocCreateBatchOperator 使用 Dataproc Metastore 服務時,您會設定這個變數。

使用 Google Cloud 控制台和 Airflow 使用者介面設定各個 Airflow 變數

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境的「Airflow」連結。Airflow UI 會隨即開啟。

  3. 在 Airflow UI 中,依序選取「管理」>「變數」

  4. 按一下「新增記錄」

  5. 在「Key」欄位中指定變數名稱,並在「Val」欄位中設定變數值。

  6. 按一下 [儲存]

建立永久記錄伺服器

使用永久記錄伺服器 (PHS) 查看批次工作負載的 Spark 記錄檔案:

  1. 建立永久記錄伺服器
  2. 請確認您已在 phs_cluster Airflow 變數中指定 PHS 叢集的名稱。

DataprocCreateBatchOperator

下列 DAG 會啟動 Dataproc Serverless Batch 工作負載。

如要進一步瞭解 DataprocCreateBatchOperator 引數,請參閱運算子的原始碼

如要進一步瞭解可在 DataprocCreateBatchOperatorbatch 參數中傳遞的屬性,請參閱 Batch 類別的說明


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

搭配 DataprocCreateBatchOperator 使用自訂容器映像檔

以下範例說明如何使用自訂容器映像檔來執行工作負載。舉例來說,您可以使用自訂容器,新增預設容器映像檔未提供的 Python 依附元件。

如要使用自訂容器映像檔,請按照下列步驟操作:

  1. 建立自訂容器映像檔,並上傳至 Container Registry

  2. image_name Airflow 變數中指定圖片。

  3. 搭配自訂映像檔使用 DataprocCreateBatchOperator:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

搭配 DataprocCreateBatchOperator 使用 Dataproc Metastore 服務

如要在 DAG 中使用 Dataproc Metastore 服務,請按照下列步驟操作:

  1. 確認 Metastore 服務是否已啟動。

    如要瞭解如何啟動中繼資料服務,請參閱「啟用及停用 Dataproc Metastore」。

    如要進一步瞭解用於建立設定的批次運算子,請參閱 PeripheralsConfig

  2. 當 Metastore 服務啟動並運作後,請在 metastore_cluster 變數中指定其名稱,並在 region_name Airflow 變數中指定其區域。

  3. 在 DataprocCreateBatchOperator 中使用中繼存放區服務:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

您可以使用 DataprocDeleteBatchOperator,根據工作負載的批次 ID 刪除批次。

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator 會列出指定 project_id 和區域中的批次。

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator 會擷取特定批次工作負載。

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

後續步驟